Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1BDB2200C6B for ; Tue, 18 Apr 2017 07:25:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1A71B160BB8; Tue, 18 Apr 2017 05:25:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2A995160BB4 for ; Tue, 18 Apr 2017 07:25:18 +0200 (CEST) Received: (qmail 33600 invoked by uid 500); 18 Apr 2017 05:25:17 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 32885 invoked by uid 99); 18 Apr 2017 05:25:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 05:25:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 280BBDFCB3; Tue, 18 Apr 2017 05:25:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 18 Apr 2017 05:25:21 -0000 Message-Id: <28b79991e0114c889cd2b330027e3643@git.apache.org> In-Reply-To: <56ffe51847964a9e957e4f5cba73318a@git.apache.org> References: <56ffe51847964a9e957e4f5cba73318a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/54] [abbrv] ignite git commit: ignite-4587 CacheAtomicWriteOrderMode.CLOCK mode is removed archived-at: Tue, 18 Apr 2017 05:25:21 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java index 36f2902..e69de29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Snapshot of time deltas for given topology. - */ -public class GridClockDeltaSnapshot { - /** Time delta version. */ - private final GridClockDeltaVersion ver; - - /** Deltas between coordinator and nodes by node ID. */ - private final Map deltas; - - /** Pending delta values. */ - @GridToStringExclude - private final Map pendingDeltas; - - /** - * @param ver Snapshot version. - * @param locNodeId Local node ID. - * @param discoSnap Discovery snapshot. - * @param avgSize Average size. - */ - public GridClockDeltaSnapshot( - GridClockDeltaVersion ver, - UUID locNodeId, - GridDiscoveryTopologySnapshot discoSnap, - int avgSize - ) { - assert ver.topologyVersion() == discoSnap.topologyVersion(); - - this.ver = ver; - - deltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f); - - pendingDeltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f); - - for (ClusterNode n : discoSnap.topologyNodes()) { - if (!locNodeId.equals(n.id())) - pendingDeltas.put(n.id(), new DeltaAverage(avgSize)); - } - } - - /** - * @param ver Snapshot version. - * @param deltas Deltas map. - */ - public GridClockDeltaSnapshot(GridClockDeltaVersion ver, Map deltas) { - this.ver = ver; - this.deltas = deltas; - - pendingDeltas = Collections.emptyMap(); - } - - /** - * @return Version. - */ - public GridClockDeltaVersion version() { - return ver; - } - - /** - * @return Map of collected deltas. - */ - public Map deltas() { - return deltas; - } - - /** - * Awaits either until snapshot is ready or timeout elapses. - * - * @param timeout Timeout to wait. - * @throws IgniteInterruptedCheckedException If wait was interrupted. - */ - public synchronized void awaitReady(long timeout) throws IgniteInterruptedCheckedException { - long start = System.currentTimeMillis(); - - try { - while (!ready()) { - long now = System.currentTimeMillis(); - - if (start + timeout - now <= 0) - return; - - wait(start + timeout - now); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } - - /** - * Callback invoked when time delta is received from remote node. - * - * @param nodeId Node ID. - * @param timeDelta Calculated time delta. - * @return {@code True} if more samples needed from that node. - */ - public synchronized boolean onDeltaReceived(UUID nodeId, long timeDelta) { - DeltaAverage avg = pendingDeltas.get(nodeId); - - if (avg != null) { - avg.onValue(timeDelta); - - if (avg.ready()) { - pendingDeltas.remove(nodeId); - - deltas.put(nodeId, avg.average()); - - if (ready()) - notifyAll(); - - return false; - } - - return true; - } - - return false; - } - - /** - * Callback invoked when node left. - * - * @param nodeId Left node ID. - */ - public synchronized void onNodeLeft(UUID nodeId) { - pendingDeltas.remove(nodeId); - - deltas.put(nodeId, 0L); - - if (ready()) - notifyAll(); - } - - /** - * @return {@code True} if snapshot is ready. - */ - public synchronized boolean ready() { - return pendingDeltas.isEmpty(); - } - - /** - * @return Collection of node IDs for which response was not received so far. - */ - public synchronized Collection pendingNodeIds() { - // Must return copy. - return new HashSet<>(pendingDeltas.keySet()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClockDeltaSnapshot.class, this); - } - - /** - * Delta average. - */ - private static class DeltaAverage { - /** Delta values. */ - private long[] vals; - - /** Current index. */ - private int idx; - - /** - * @param size Accumulator size. - */ - private DeltaAverage(int size) { - vals = new long[size]; - } - - /** - * Adds value to accumulator. - * - * @param val Value to add. - */ - public void onValue(long val) { - if (idx < vals.length) - vals[idx++] = val; - } - - /** - * Whether this average is complete. - * - * @return {@code True} if enough values is collected. - */ - public boolean ready() { - return idx == vals.length; - } - - /** - * @return Average delta. - */ - public long average() { - long sum = 0; - - for (long val : vals) - sum += val; - - return sum / vals.length; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java index 4306d7e..e69de29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Message containing time delta map for all nodes. - */ -public class GridClockDeltaSnapshotMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Snapshot version. */ - private GridClockDeltaVersion snapVer; - - /** Grid time deltas. */ - @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = long.class) - private Map deltas; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridClockDeltaSnapshotMessage() { - // No-op. - } - - /** - * @param snapVer Snapshot version. - * @param deltas Deltas map. - */ - public GridClockDeltaSnapshotMessage(GridClockDeltaVersion snapVer, Map deltas) { - this.snapVer = snapVer; - this.deltas = deltas; - } - - /** - * @return Snapshot version. - */ - public GridClockDeltaVersion snapshotVersion() { - return snapVer; - } - - /** - * @return Time deltas map. - */ - public Map deltas() { - return deltas; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeMap("deltas", deltas, MessageCollectionItemType.UUID, MessageCollectionItemType.LONG)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeMessage("snapVer", snapVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - deltas = reader.readMap("deltas", MessageCollectionItemType.UUID, MessageCollectionItemType.LONG, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - snapVer = reader.readMessage("snapVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridClockDeltaSnapshotMessage.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 60; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClockDeltaSnapshotMessage.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java index 19c75e6..e69de29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.nio.ByteBuffer; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Version for time delta snapshot. - */ -public class GridClockDeltaVersion implements Message, Comparable, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Snapshot local version. */ - private long ver; - - /** Topology version. */ - private long topVer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridClockDeltaVersion() { - // No-op. - } - - /** - * @param ver Version. - * @param topVer Topology version. - */ - public GridClockDeltaVersion(long ver, long topVer) { - this.ver = ver; - this.topVer = topVer; - } - - /** - * @return Snapshot local version. - */ - public long version() { - return ver; - } - - /** - * @return Snapshot topology version. - */ - public long topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int compareTo(GridClockDeltaVersion o) { - int res = Long.compare(topVer, o.topVer); - - if (res == 0) - res = Long.compare(ver, o.ver); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridClockDeltaVersion)) - return false; - - GridClockDeltaVersion that = (GridClockDeltaVersion)o; - - return topVer == that.topVer && ver == that.ver; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = (int)(ver ^ (ver >>> 32)); - - res = 31 * res + (int)(topVer ^ (topVer >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(ver); - out.writeLong(topVer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ver = in.readLong(); - topVer = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeLong("topVer", topVer)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeLong("ver", ver)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - topVer = reader.readLong("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - ver = reader.readLong("ver"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridClockDeltaVersion.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 83; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClockDeltaVersion.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java deleted file mode 100644 index 99dc817..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Time server message. - */ -public class GridClockMessage { - /** Packet size. */ - public static final int PACKET_SIZE = 48; - - /** Originating node ID. */ - private UUID origNodeId; - - /** Target node ID. */ - private UUID targetNodeId; - - /** Originating timestamp. */ - private long origTs; - - /** Remote node reply ts. */ - private long replyTs; - - /** - * @param origNodeId Originating node ID. - * @param targetNodeId Target node ID. - * @param origTs Originating timestamp. - * @param replyTs Reply timestamp. - */ - public GridClockMessage(UUID origNodeId, UUID targetNodeId, long origTs, long replyTs) { - this.origNodeId = origNodeId; - this.targetNodeId = targetNodeId; - this.origTs = origTs; - this.replyTs = replyTs; - } - - /** - * @return Originating node ID. - */ - public UUID originatingNodeId() { - return origNodeId; - } - - /** - * @param origNodeId Originating node ID. - */ - public void originatingNodeId(UUID origNodeId) { - this.origNodeId = origNodeId; - } - - /** - * @return Target node ID. - */ - public UUID targetNodeId() { - return targetNodeId; - } - - /** - * @param targetNodeId Target node ID. - */ - public void targetNodeId(UUID targetNodeId) { - this.targetNodeId = targetNodeId; - } - - /** - * @return Originating timestamp. - */ - public long originatingTimestamp() { - return origTs; - } - - /** - * @param origTs Originating timestamp. - */ - public void originatingTimestamp(long origTs) { - this.origTs = origTs; - } - - /** - * @return Reply timestamp. - */ - public long replyTimestamp() { - return replyTs; - } - - /** - * @param replyTs Reply timestamp. - */ - public void replyTimestamp(long replyTs) { - this.replyTs = replyTs; - } - - /** - * Converts message to bytes to send over network. - * - * @return Bytes representing this packet. - */ - public byte[] toBytes() { - byte[] buf = new byte[PACKET_SIZE]; - - int off = 0; - - off = U.longToBytes(origNodeId.getLeastSignificantBits(), buf, off); - off = U.longToBytes(origNodeId.getMostSignificantBits(), buf, off); - - off = U.longToBytes(targetNodeId.getLeastSignificantBits(), buf, off); - off = U.longToBytes(targetNodeId.getMostSignificantBits(), buf, off); - - off = U.longToBytes(origTs, buf, off); - - off = U.longToBytes(replyTs, buf, off); - - assert off == PACKET_SIZE; - - return buf; - } - - /** - * Constructs message from bytes. - * - * @param buf Bytes. - * @param off Offset. - * @param len Packet length. - * @return Assembled message. - * @throws IgniteCheckedException If message length is invalid. - */ - public static GridClockMessage fromBytes(byte[] buf, int off, int len) throws IgniteCheckedException { - if (len < PACKET_SIZE) - throw new IgniteCheckedException("Failed to assemble time server packet (message is too short)."); - - long lsb = U.bytesToLong(buf, off); - long msb = U.bytesToLong(buf, off + 8); - - UUID origNodeId = new UUID(msb, lsb); - - lsb = U.bytesToLong(buf, off + 16); - msb = U.bytesToLong(buf, off + 24); - - UUID targetNodeId = new UUID(msb, lsb); - - long origTs = U.bytesToLong(buf, off + 32); - long replyTs = U.bytesToLong(buf, off + 40); - - return new GridClockMessage(origNodeId, targetNodeId, origTs, replyTs); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClockMessage.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java index 51d396a..e69de29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketException; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; - -/** - * Time server that enables time synchronization between nodes. - */ -public class GridClockServer { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Datagram socket for message exchange. */ - private DatagramSocket sock; - - /** Logger. */ - private IgniteLogger log; - - /** Read worker. */ - private GridWorker readWorker; - - /** Instance of time processor. */ - private GridClockSyncProcessor clockSync; - - /** - * Starts server. - * - * @param ctx Kernal context. - * @throws IgniteCheckedException If server could not be started. - */ - public void start(GridKernalContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - clockSync = ctx.clockSync(); - log = ctx.log(GridClockServer.class); - - try { - int startPort = ctx.config().getTimeServerPortBase(); - int portRange = ctx.config().getTimeServerPortRange(); - int endPort = portRange == 0 ? startPort : startPort + portRange - 1; - - InetAddress locHost; - - if (F.isEmpty(ctx.config().getLocalHost())) { - try { - locHost = U.getLocalHost(); - } - catch (IOException ignored) { - locHost = InetAddress.getLoopbackAddress(); - - U.warn(log, "Failed to get local host address, will use loopback address: " + locHost); - } - } - else - locHost = InetAddress.getByName(ctx.config().getLocalHost()); - - for (int p = startPort; p <= endPort; p++) { - try { - sock = new DatagramSocket(p, locHost); - - if (log.isDebugEnabled()) - log.debug("Successfully bound time server [host=" + locHost + ", port=" + p + ']'); - - break; - } - catch (SocketException e) { - if (log.isDebugEnabled()) - log.debug("Failed to bind time server socket [host=" + locHost + ", port=" + p + - ", err=" + e.getMessage() + ']'); - } - } - - if (sock == null) - throw new IgniteCheckedException("Failed to bind time server socket within specified port range " + - "[locHost=" + locHost + ", startPort=" + startPort + ", endPort=" + endPort + ']'); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to start time server (failed to get local host address)", e); - } - } - - /** - * After start callback. - */ - public void afterStart() { - readWorker = new ReadWorker(); - - IgniteThread th = new IgniteThread(readWorker); - - th.setPriority(Thread.MAX_PRIORITY); - - th.start(); - } - - /** - * Stops server. - */ - public void stop() { - // No-op. - } - - /** - * Before stop callback. - */ - public void beforeStop() { - if (readWorker != null) - readWorker.cancel(); - - U.closeQuiet(sock); - - if (readWorker != null) - U.join(readWorker, log); - } - - /** - * Sends packet to remote node. - * - * @param msg Message to send. - * @param addr Address. - * @param port Port. - * @throws IgniteCheckedException If send failed. - */ - public void sendPacket(GridClockMessage msg, InetAddress addr, int port) throws IgniteCheckedException { - try { - DatagramPacket packet = new DatagramPacket(msg.toBytes(), GridClockMessage.PACKET_SIZE, addr, port); - - if (log.isDebugEnabled()) - log.debug("Sending time sync packet [msg=" + msg + ", addr=" + addr + ", port=" + port); - - sock.send(packet); - } - catch (IOException e) { - if (!sock.isClosed()) - throw new IgniteCheckedException("Failed to send datagram message to remote node [addr=" + addr + - ", port=" + port + ", msg=" + msg + ']', e); - } - } - - /** - * @return Address to which this server is bound. - */ - public InetAddress host() { - return sock.getLocalAddress(); - } - - /** - * @return Port to which this server is bound. - */ - public int port() { - return sock.getLocalPort(); - } - - /** - * Message read worker. - */ - private class ReadWorker extends GridWorker { - /** - * Creates read worker. - */ - protected ReadWorker() { - super(ctx.igniteInstanceName(), "grid-time-server-reader", GridClockServer.this.log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - DatagramPacket packet = new DatagramPacket(new byte[GridClockMessage.PACKET_SIZE], - GridClockMessage.PACKET_SIZE); - - while (!isCancelled()) { - try { - // Read packet from buffer. - sock.receive(packet); - - if (log.isDebugEnabled()) - log.debug("Received clock sync message from remote node [host=" + packet.getAddress() + - ", port=" + packet.getPort() + ']'); - - GridClockMessage msg = GridClockMessage.fromBytes(packet.getData(), packet.getOffset(), - packet.getLength()); - - clockSync.onMessageReceived(msg, packet.getAddress(), packet.getPort()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to assemble clock server message (will ignore the packet) [host=" + - packet.getAddress() + ", port=" + packet.getPort() + ", err=" + e.getMessage() + ']'); - } - catch (IOException e) { - if (!isCancelled()) - U.warn(log, "Failed to receive message on datagram socket: " + e); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java deleted file mode 100644 index ef7dc06..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -/** - * Interface representing time source for time processor. - */ -public interface GridClockSource { - /** - * Gets current time in milliseconds past since 1 January, 1970. - * - * @return Current time in milliseconds. - */ - public long currentTimeMillis(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 6c9ffe8..e69de29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.net.InetAddress; -import java.util.Collection; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_TIME_SYNC; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_HOST; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_PORT; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; - -/** - * Time synchronization processor. - */ -public class GridClockSyncProcessor extends GridProcessorAdapter { - /** Maximum size for time sync history. */ - private static final int MAX_TIME_SYNC_HISTORY = 100; - - /** Time server instance. */ - private GridClockServer srv; - - /** Shutdown lock. */ - private GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Time coordinator thread. */ - private volatile TimeCoordinator timeCoord; - - /** Time delta history. Constructed on coordinator. */ - private NavigableMap timeSyncHist = - new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY); - - /** Last recorded. */ - private volatile T2 lastSnapshot; - - /** Time source. */ - private GridClockSource clockSrc; - - /** - * @param ctx Kernal context. - */ - public GridClockSyncProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start(boolean activeOnStart) throws IgniteCheckedException { - super.start(activeOnStart); - - clockSrc = ctx.timeSource(); - - srv = new GridClockServer(); - - srv.start(ctx); - - ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridClockDeltaSnapshotMessage; - - GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg; - - GridClockDeltaVersion ver = msg0.snapshotVersion(); - - GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas()); - - lastSnapshot = new T2<>(ver, snap); - - timeSyncHist.put(ver, snap); - } - }); - - // We care only about node leave and fail events. - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) - checkLaunchCoordinator(discoEvt); - - TimeCoordinator timeCoord0 = timeCoord; - - if (timeCoord0 != null) - timeCoord0.onDiscoveryEvent(discoEvt); - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED); - - ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host()); - ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { - super.onKernalStart(activeOnStart); - - srv.afterStart(); - - // Check at startup if this node is a fragmentizer coordinator. - DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent(); - - checkLaunchCoordinator(locJoinEvt); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - rw.writeLock(); - - try { - stopping = false; - - if (timeCoord != null) { - timeCoord.cancel(); - - U.join(timeCoord, log); - - timeCoord = null; - } - - if (srv != null) - srv.beforeStop(); - } - finally { - rw.writeUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (srv != null) - srv.stop(); - } - - /** - * Gets current time on local node. - * - * @return Current time in milliseconds. - */ - private long currentTime() { - return clockSrc.currentTimeMillis(); - } - - /** - * @return Time sync history. - */ - public NavigableMap timeSyncHistory() { - return timeSyncHist; - } - - /** - * Callback from server for message receiving. - * - * @param msg Received message. - * @param addr Remote node address. - * @param port Remote node port. - */ - public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) { - long rcvTs = currentTime(); - - if (!msg.originatingNodeId().equals(ctx.localNodeId())) { - // We received time request from remote node, set current time and reply back. - msg.replyTimestamp(rcvTs); - - try { - srv.sendPacket(msg, addr, port); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send time server reply to remote node: " + msg, e); - } - } - else - timeCoord.onMessage(msg, rcvTs); - } - - /** - * Checks if local node is the oldest node in topology and starts time coordinator if so. - * - * @param discoEvt Discovery event. - */ - private void checkLaunchCoordinator(DiscoveryEvent discoEvt) { - rw.readLock(); - - try { - if (stopping) - return; - - if (timeCoord == null) { - long minNodeOrder = Long.MAX_VALUE; - - Collection nodes = discoEvt.topologyNodes(); - - for (ClusterNode node : nodes) { - if (node.order() < minNodeOrder) - minNodeOrder = node.order(); - } - - ClusterNode locNode = ctx.discovery().localNode(); - - if (locNode.order() == minNodeOrder) { - if (log.isDebugEnabled()) - log.debug("Detected local node to be the eldest node in topology, starting time " + - "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']'); - - synchronized (this) { - if (timeCoord == null && !stopping) { - timeCoord = new TimeCoordinator(discoEvt); - - IgniteThread th = new IgniteThread(timeCoord); - - th.setPriority(Thread.MAX_PRIORITY); - - th.start(); - } - } - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Gets time adjusted with time coordinator on given topology version. - * - * @param topVer Topology version. - * @return Adjusted time. - */ - public long adjustedTime(long topVer) { - T2 fastSnap = lastSnapshot; - - GridClockDeltaSnapshot snap; - - if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer) - snap = fastSnap.get2(); - else { - // Get last synchronized time on given topology version. - Map.Entry entry = timeSyncHistory().lowerEntry( - new GridClockDeltaVersion(0, topVer + 1)); - - snap = entry == null ? null : entry.getValue(); - } - - long now = clockSrc.currentTimeMillis(); - - if (snap == null) - return now; - - Long delta = snap.deltas().get(ctx.localNodeId()); - - if (delta == null) - delta = 0L; - - return now + delta; - } - - /** - * Publishes snapshot to topology. - * - * @param snapshot Snapshot to publish. - * @param top Topology to send given snapshot to. - */ - private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) { - if (!rw.tryReadLock()) - return; - - try { - lastSnapshot = new T2<>(snapshot.version(), snapshot); - - timeSyncHist.put(snapshot.version(), snapshot); - - for (ClusterNode n : top.topologyNodes()) { - GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage( - snapshot.version(), snapshot.deltas()); - - try { - ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().pingNodeNoError(n.id())) - U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Time coordinator thread. - */ - private class TimeCoordinator extends GridWorker { - /** Last discovery topology snapshot. */ - private volatile GridDiscoveryTopologySnapshot lastSnapshot; - - /** Snapshot being constructed. May be not null only on coordinator node. */ - private volatile GridClockDeltaSnapshot pendingSnapshot; - - /** Version counter. */ - private long verCnt = 1; - - /** - * Time coordinator thread constructor. - * - * @param evt Discovery event on which this node became a coordinator. - */ - protected TimeCoordinator(DiscoveryEvent evt) { - super(ctx.igniteInstanceName(), "grid-time-coordinator", GridClockSyncProcessor.this.log); - - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - GridDiscoveryTopologySnapshot top = lastSnapshot; - - if (log.isDebugEnabled()) - log.debug("Creating time sync snapshot for topology: " + top); - - GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot( - new GridClockDeltaVersion(verCnt++, top.topologyVersion()), - ctx.localNodeId(), - top, - ctx.config().getClockSyncSamples()); - - pendingSnapshot = snapshot; - - while (!snapshot.ready()) { - if (log.isDebugEnabled()) - log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds()); - - for (UUID nodeId : snapshot.pendingNodeIds()) - requestTime(nodeId); - - if (log.isDebugEnabled()) - log.debug("Waiting for snapshot to be ready: " + snapshot); - - // Wait for all replies - snapshot.awaitReady(1000); - } - - // No more messages should be processed. - pendingSnapshot = null; - - if (log.isDebugEnabled()) - log.debug("Collected time sync results: " + snapshot.deltas()); - - publish(snapshot, top); - - synchronized (this) { - if (top.topologyVersion() == lastSnapshot.topologyVersion()) - wait(ctx.config().getClockSyncFrequency()); - } - } - } - - /** - * @param evt Discovery event. - */ - public void onDiscoveryEvent(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event: " + evt); - - if (evt.type() == EventType.EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) - onNodeLeft(evt.eventNode().id()); - - synchronized (this) { - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - - notifyAll(); - } - } - - /** - * @param msg Message received from remote node. - * @param rcvTs Receive timestamp. - */ - private void onMessage(GridClockMessage msg, long rcvTs) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) { - long delta = (msg.originatingTimestamp() + rcvTs) / 2 - msg.replyTimestamp(); - - boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta); - - if (needMore) - requestTime(msg.targetNodeId()); - } - } - - /** - * Requests time from remote node. - * - * @param rmtNodeId Remote node ID. - */ - private void requestTime(UUID rmtNodeId) { - ClusterNode node = ctx.discovery().node(rmtNodeId); - - if (node != null) { - InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST); - int port = node.attribute(ATTR_TIME_SERVER_PORT); - - try { - GridClockMessage req = new GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0); - - srv.sendPacket(req, addr, port); - } - catch (IgniteCheckedException e) { - LT.error(log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId + - ", addr=" + addr + ", port=" + port + ']'); - } - } - else - onNodeLeft(rmtNodeId); - } - - /** - * Node left callback. - * - * @param nodeId Left node ID. - */ - private void onNodeLeft(UUID nodeId) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) - curr.onNodeLeft(nodeId); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java deleted file mode 100644 index 77bc3eb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -/** - * JVM time source. - */ -public class GridJvmClockSource implements GridClockSource { - /** {@inheritDoc} */ - @Override public long currentTimeMillis() { - return System.currentTimeMillis(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index e4cb0ce..102db96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -975,7 +974,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen ccfg.setAtomicityMode(cfg.getAtomicityMode()); ccfg.setNodeFilter(cfg.getNodeFilter()); ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); ccfg.setRebalanceMode(SYNC); return ccfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 3cf88d3..bb11b7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -25,7 +25,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -528,7 +527,6 @@ public class IgfsUtils { */ private static CacheConfiguration defaultCacheConfig() { CacheConfiguration cfg = new CacheConfiguration(); - cfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY); cfg.setAtomicityMode(TRANSACTIONAL); cfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheMode(CacheMode.PARTITIONED); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 17a5a16..6a15b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -36,7 +36,6 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; @@ -140,7 +139,6 @@ public class PlatformConfigurationUtils { CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setAtomicityMode(CacheAtomicityMode.fromOrdinal(in.readInt())); - ccfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.fromOrdinal((byte)in.readInt())); ccfg.setBackups(in.readInt()); ccfg.setCacheMode(CacheMode.fromOrdinal(in.readInt())); ccfg.setCopyOnRead(in.readBoolean()); @@ -740,7 +738,6 @@ public class PlatformConfigurationUtils { assert ccfg != null; writeEnumInt(writer, ccfg.getAtomicityMode(), CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE); - writeEnumInt(writer, ccfg.getAtomicWriteOrderMode()); writer.writeInt(ccfg.getBackups()); writeEnumInt(writer, ccfg.getCacheMode(), CacheConfiguration.DFLT_CACHE_MODE); writer.writeBoolean(ccfg.isCopyOnRead()); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 2023749..7d7d071 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9285,10 +9285,6 @@ public abstract class IgniteUtils { off += 4; - GridUnsafe.putLong(arr, off, drVer.globalTime()); - - off += 8; - GridUnsafe.putLong(arr, off, drVer.order()); off += 8; @@ -9302,10 +9298,6 @@ public abstract class IgniteUtils { off += 4; - GridUnsafe.putLong(arr, off, ver.globalTime()); - - off += 8; - GridUnsafe.putLong(arr, off, ver.order()); off += 8; @@ -9321,16 +9313,14 @@ public abstract class IgniteUtils { public static GridCacheVersion readVersion(long ptr, boolean verEx) { GridCacheVersion ver = new GridCacheVersion(GridUnsafe.getInt(ptr), GridUnsafe.getInt(ptr + 4), - GridUnsafe.getLong(ptr + 8), - GridUnsafe.getLong(ptr + 16)); + GridUnsafe.getLong(ptr + 8)); if (verEx) { - ptr += 24; + ptr += 16; ver = new GridCacheVersionEx(GridUnsafe.getInt(ptr), GridUnsafe.getInt(ptr + 4), GridUnsafe.getLong(ptr + 8), - GridUnsafe.getLong(ptr + 16), ver); } @@ -9352,15 +9342,11 @@ public abstract class IgniteUtils { off += 4; - long globalTime = GridUnsafe.getLong(arr, off); - - off += 8; - long order = GridUnsafe.getLong(arr, off); off += 8; - GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); + GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, order); if (verEx) { topVer = GridUnsafe.getInt(arr, off); @@ -9371,13 +9357,9 @@ public abstract class IgniteUtils { off += 4; - globalTime = GridUnsafe.getLong(arr, off); - - off += 8; - order = GridUnsafe.getLong(arr, off); - ver = new GridCacheVersionEx(topVer, nodeOrderDrId, globalTime, order, ver); + ver = new GridCacheVersionEx(topVer, nodeOrderDrId, order, ver); } return ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java index 91a501c..391b120 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.List; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -53,9 +52,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject { /** Cache atomicity mode. */ private CacheAtomicityMode atomicityMode; - /** Cache atomicity write ordering mode. */ - private CacheAtomicWriteOrderMode atomicWriteOrderMode; - /** Eager ttl flag. */ private boolean eagerTtl; @@ -148,7 +144,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject { name = ccfg.getName(); mode = ccfg.getCacheMode(); atomicityMode = ccfg.getAtomicityMode(); - atomicWriteOrderMode = ccfg.getAtomicWriteOrderMode(); eagerTtl = ccfg.isEagerTtl(); writeSynchronizationMode = ccfg.getWriteSynchronizationMode(); invalidate = ccfg.isInvalidate(); @@ -202,10 +197,10 @@ public class VisorCacheConfiguration extends VisorDataTransferObject { } /** - * @return Cache atomicity write ordering mode. + * @return Eager ttl flag */ - public CacheAtomicWriteOrderMode getAtomicWriteOrderMode() { - return atomicWriteOrderMode; + public boolean eagerTtl() { + return eagerTtl; } /** @@ -395,7 +390,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject { U.writeString(out, name); U.writeEnum(out, mode); U.writeEnum(out, atomicityMode); - U.writeEnum(out, atomicWriteOrderMode); out.writeBoolean(eagerTtl); U.writeEnum(out, writeSynchronizationMode); out.writeBoolean(invalidate); @@ -429,7 +423,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject { name = U.readString(in); mode = CacheMode.fromOrdinal(in.readByte()); atomicityMode = CacheAtomicityMode.fromOrdinal(in.readByte()); - atomicWriteOrderMode = CacheAtomicWriteOrderMode.fromOrdinal(in.readByte()); eagerTtl = in.readBoolean(); writeSynchronizationMode = CacheWriteSynchronizationMode.fromOrdinal(in.readByte()); invalidate = in.readBoolean(); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java index a5e83c1..aa84706 100644 --- a/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java @@ -413,8 +413,7 @@ public class BasicWarmupClosure implements IgniteInClosure return F.eq(ccfg0.getCacheMode(), ccfg1.getCacheMode()) && F.eq(ccfg0.getBackups(), ccfg1.getBackups()) && - F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode()) && - F.eq(ccfg0.getAtomicWriteOrderMode(), ccfg1.getAtomicWriteOrderMode()); + F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index ebd28d8..22124af 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -29,7 +29,6 @@ org.apache.ignite.binary.BinaryInvalidTypeException org.apache.ignite.binary.BinaryObject org.apache.ignite.binary.BinaryObjectException org.apache.ignite.cache.CacheAtomicUpdateTimeoutException -org.apache.ignite.cache.CacheAtomicWriteOrderMode org.apache.ignite.cache.CacheAtomicityMode org.apache.ignite.cache.CacheEntryEventSerializableFilter org.apache.ignite.cache.CacheEntryProcessor @@ -977,8 +976,8 @@ org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserCacheObjectByteArrayImpl org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserCacheObjectImpl org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserKeyCacheObjectImpl -org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage -org.apache.ignite.internal.processors.clock.GridClockDeltaVersion +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1 +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2 http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/example-cache-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/websession/example-cache-base.xml b/modules/core/src/test/config/websession/example-cache-base.xml index 20f103e..9654fab 100644 --- a/modules/core/src/test/config/websession/example-cache-base.xml +++ b/modules/core/src/test/config/websession/example-cache-base.xml @@ -65,7 +65,6 @@ - http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-1.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/websession/spring-cache-1.xml b/modules/core/src/test/config/websession/spring-cache-1.xml index 9d049fa..f810282 100644 --- a/modules/core/src/test/config/websession/spring-cache-1.xml +++ b/modules/core/src/test/config/websession/spring-cache-1.xml @@ -40,8 +40,6 @@ - - @@ -56,8 +54,6 @@ - - @@ -72,8 +68,6 @@ - - http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-2.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/websession/spring-cache-2.xml b/modules/core/src/test/config/websession/spring-cache-2.xml index 7545ec7..ec20ed9 100644 --- a/modules/core/src/test/config/websession/spring-cache-2.xml +++ b/modules/core/src/test/config/websession/spring-cache-2.xml @@ -40,8 +40,6 @@ - - @@ -56,8 +54,6 @@ - - @@ -72,8 +68,6 @@ - - http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-3.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/websession/spring-cache-3.xml b/modules/core/src/test/config/websession/spring-cache-3.xml index a0fb189..6996c40 100644 --- a/modules/core/src/test/config/websession/spring-cache-3.xml +++ b/modules/core/src/test/config/websession/spring-cache-3.xml @@ -40,8 +40,6 @@ - - @@ -56,8 +54,6 @@ - - @@ -72,8 +68,6 @@ - - http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index dff827d..5e3b896 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -38,7 +38,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterGroup; @@ -74,9 +73,6 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.values; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -176,7 +172,6 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setName("nearCache"); - ccfg.setAtomicWriteOrderMode(PRIMARY); final IgniteCache nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()); @@ -215,7 +210,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac catch (CacheException e) { log.info("Expected exception: " + e); - IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause(); + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); e0.reconnectFuture().get(); } @@ -229,7 +224,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac })); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); assertEquals(0, disconnectLatch.getCount()); @@ -344,7 +340,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac tx.commit(); fail(); - } catch (IgniteClientDisconnectedException e) { + } + catch (IgniteClientDisconnectedException e) { log.info("Expected error: " + e); assertNotNull(e.reconnectFuture()); @@ -354,7 +351,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac txs.txStart(); fail(); - } catch (IgniteClientDisconnectedException e) { + } + catch (IgniteClientDisconnectedException e) { log.info("Expected error: " + e); assertNotNull(e.reconnectFuture()); @@ -412,8 +410,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac private void reconnectTransactionInProgress1(IgniteEx client, final TransactionConcurrency txConcurrency, final IgniteCache cache) - throws Exception - { + throws Exception { Ignite srv = clientRouter(client); final TestTcpDiscoverySpi clientSpi = spi(client); @@ -432,7 +429,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac info("Disconnected: " + evt); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); reconnectLatch.countDown(); @@ -787,42 +785,34 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac int cnt = 0; for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { - CacheAtomicWriteOrderMode[] writeOrders = - atomicityMode == ATOMIC ? values() : new CacheAtomicWriteOrderMode[]{CLOCK}; - - for (CacheAtomicWriteOrderMode writeOrder : writeOrders) { - for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { - CacheConfiguration ccfg = new CacheConfiguration<>(); - - ccfg.setAtomicityMode(atomicityMode); + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + CacheConfiguration ccfg = new CacheConfiguration<>(); - ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setAtomicityMode(atomicityMode); - ccfg.setName("cache-" + cnt++); + ccfg.setName("cache-" + cnt++); - ccfg.setWriteSynchronizationMode(syncMode); + ccfg.setWriteSynchronizationMode(syncMode); - if (syncMode != FULL_ASYNC) { - Class cls = (ccfg.getAtomicityMode() == ATOMIC) ? - GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; + if (syncMode != FULL_ASYNC) { + Class cls = (ccfg.getAtomicityMode() == ATOMIC) ? + GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; - log.info("Test cache put [atomicity=" + atomicityMode + - ", writeOrder=" + writeOrder + - ", syncMode=" + syncMode + ']'); + log.info("Test cache put [atomicity=" + atomicityMode + + ", syncMode=" + syncMode + ']'); - checkOperationInProgressFails(client, ccfg, cls, putOp); + checkOperationInProgressFails(client, ccfg, cls, putOp); - client.destroyCache(ccfg.getName()); - } + client.destroyCache(ccfg.getName()); + } - log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); + log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); - checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp); + checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp); - checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp); + checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp); - client.destroyCache(ccfg.getName()); - } + client.destroyCache(ccfg.getName()); } } } @@ -1121,7 +1111,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac @Override public Void call() throws Exception { int idx = nodeIdx.incrementAndGet(); - for (int i = 0; i < 25; i++) { + for (int i = 0; i < 25; i++) { startGrid(idx); stopGrid(idx); @@ -1260,27 +1250,32 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac /** * */ - static class TestClass1 implements Serializable {} + static class TestClass1 implements Serializable { + } /** * */ - static class TestClass2 implements Serializable {} + static class TestClass2 implements Serializable { + } /** * */ - static class TestClass3 implements Serializable {} + static class TestClass3 implements Serializable { + } /** * */ - static class TestClass4 implements Serializable {} + static class TestClass4 implements Serializable { + } /** * */ - static class TestClass5 implements Serializable {} + static class TestClass5 implements Serializable { + } /** * @param client Client. @@ -1293,8 +1288,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final CacheConfiguration ccfg, Class msgToBlock, final IgniteInClosure> c) - throws Exception - { + throws Exception { Ignite srv = clientRouter(client); TestTcpDiscoverySpi srvSpi = spi(srv); @@ -1381,8 +1375,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac String cacheName, boolean cacheExists, boolean clientCache, - boolean clientNear) - { + boolean clientNear) { GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java index a4c59ea..991735b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java @@ -42,7 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -72,7 +71,6 @@ public class CacheAtomicSingleMessageCountSelfTest extends GridCommonAbstractTes cCfg.setCacheMode(PARTITIONED); cCfg.setBackups(1); cCfg.setWriteSynchronizationMode(FULL_SYNC); - cCfg.setAtomicWriteOrderMode(PRIMARY); cfg.setCacheConfiguration(cCfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java index 2017365..efaaee8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java @@ -34,7 +34,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -240,7 +239,6 @@ public abstract class CacheEnumOperationsAbstractTest extends GridCommonAbstract ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(cacheMode); ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); if (cacheMode == PARTITIONED) ccfg.setBackups(backups); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java index 5b7769e..7d49c11 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java @@ -60,7 +60,6 @@ import org.jetbrains.annotations.NotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -729,7 +728,6 @@ public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCo ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(cacheMode); ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); if (cacheMode == PARTITIONED) ccfg.setBackups(backups); http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java index cdd9072..70f2fc5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java @@ -26,7 +26,6 @@ import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.store.CacheStore; @@ -105,7 +104,6 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs ccfg.setCacheMode(PARTITIONED); ccfg.setAtomicityMode(atomicityMode()); - ccfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY); ccfg.setBackups(1); ccfg.setWriteSynchronizationMode(FULL_SYNC);