ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-9830 Fixed race in binary metadata registration leading to exception on commit - Fixes #4996.
Date Wed, 17 Oct 2018 12:27:33 GMT
Repository: ignite
Updated Branches:
  refs/heads/master d829b67e9 -> a1cb021c0


IGNITE-9830 Fixed race in binary metadata registration leading to exception on commit - Fixes #4996.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1cb021c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1cb021c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1cb021c

Branch: refs/heads/master
Commit: a1cb021c06ffce5da460a8cd4ffe71de2c350b54
Parents: d829b67
Author: Aleksei Scherbakov <alexey.scherbakoff@gmail.com>
Authored: Wed Oct 17 13:28:20 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Oct 17 14:52:12 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   5 +
 .../internal/binary/BinaryReaderExImpl.java     |   1 -
 .../internal/binary/BinarySchemaRegistry.java   | 114 +++--
 .../ignite/internal/binary/BinaryUtils.java     |  28 +-
 .../cache/binary/BinaryMetadataTransport.java   |  93 +++-
 .../binary/CacheObjectBinaryProcessorImpl.java  | 219 ++++++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   5 +-
 .../CachePageWriteLockUnlockTest.java           |   2 +
 .../transactions/TxRollbackOnTimeoutTest.java   |   7 +-
 ...MetadataConcurrentUpdateWithIndexesTest.java | 439 +++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite.java        |   3 +
 11 files changed, 827 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 6d48adf..02ebb25 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1008,6 +1008,11 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE";
 
     /**
+     * Timeout for waiting schema update if schema was not found for last accepted version.
+     */
+    public static final String IGNITE_WAIT_SCHEMA_UPDATE = "IGNITE_WAIT_SCHEMA_UPDATE";
+
+    /**
      * System property to override {@link CacheConfiguration#rebalanceThrottle} configuration property for all caches.
      * {@code 0} by default, which means that override is disabled.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index 38934f0..601141c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -2028,7 +2028,6 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
                     for (BinarySchema existingSchema : existingSchemas)
                         existingSchemaIds.add(existingSchema.schemaId());
 
-
                     throw new BinaryObjectException("Cannot find schema for object with compact footer" +
                         " [typeName=" + type.typeName() +
                         ", typeId=" + typeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
index 91f29b2..f22fc4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.binary;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.HashMap;
@@ -98,75 +100,95 @@ public class BinarySchemaRegistry {
      * @param schemaId Schema ID.
      * @param schema Schema.
      */
-    public void addSchema(int schemaId, BinarySchema schema) {
-        synchronized (this) {
-            if (inline) {
-                // Check if this is already known schema.
-                if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4)
-                    return;
+    public synchronized void addSchema(int schemaId, BinarySchema schema) {
+        if (inline) {
+            // Check if this is already known schema.
+            if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4)
+                return;
 
-                // Try positioning new schema in inline mode.
-                if (schemaId1 == EMPTY) {
-                    schemaId1 = schemaId;
+            // Try positioning new schema in inline mode.
+            if (schemaId1 == EMPTY) {
+                schemaId1 = schemaId;
 
-                    schema1 = schema;
+                schema1 = schema;
 
-                    inline = true; // Forcing HB edge just in case.
+                inline = true; // Forcing HB edge just in case.
 
-                    return;
-                }
+                return;
+            }
 
-                if (schemaId2 == EMPTY) {
-                    schemaId2 = schemaId;
+            if (schemaId2 == EMPTY) {
+                schemaId2 = schemaId;
 
-                    schema2 = schema;
+                schema2 = schema;
 
-                    inline = true; // Forcing HB edge just in case.
+                inline = true; // Forcing HB edge just in case.
 
-                    return;
-                }
+                return;
+            }
 
-                if (schemaId3 == EMPTY) {
-                    schemaId3 = schemaId;
+            if (schemaId3 == EMPTY) {
+                schemaId3 = schemaId;
 
-                    schema3 = schema;
+                schema3 = schema;
 
-                    inline = true; // Forcing HB edge just in case.
+                inline = true; // Forcing HB edge just in case.
 
-                    return;
-                }
+                return;
+            }
 
-                if (schemaId4 == EMPTY) {
-                    schemaId4 = schemaId;
+            if (schemaId4 == EMPTY) {
+                schemaId4 = schemaId;
 
-                    schema4 = schema;
+                schema4 = schema;
 
-                    inline = true; // Forcing HB edge just in case.
+                inline = true; // Forcing HB edge just in case.
 
-                    return;
-                }
+                return;
+            }
 
-                // No luck, switching to hash map mode.
-                HashMap<Integer, BinarySchema> newSchemas = new HashMap<>();
+            // No luck, switching to hash map mode.
+            HashMap<Integer, BinarySchema> newSchemas = new HashMap<>();
 
-                newSchemas.put(schemaId1, schema1);
-                newSchemas.put(schemaId2, schema2);
-                newSchemas.put(schemaId3, schema3);
-                newSchemas.put(schemaId4, schema4);
+            newSchemas.put(schemaId1, schema1);
+            newSchemas.put(schemaId2, schema2);
+            newSchemas.put(schemaId3, schema3);
+            newSchemas.put(schemaId4, schema4);
 
-                newSchemas.put(schemaId, schema);
+            newSchemas.put(schemaId, schema);
 
-                schemas = newSchemas;
+            schemas = newSchemas;
 
-                inline = false;
-            }
-            else {
-                HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas);
+            inline = false;
+        }
+        else {
+            HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas);
 
-                newSchemas.put(schemaId, schema);
+            newSchemas.put(schemaId, schema);
 
-                schemas = newSchemas;
-            }
+            schemas = newSchemas;
         }
     }
+
+    /**
+     * @return List of known schemas.
+     */
+    public synchronized List<BinarySchema> schemas() {
+        List<BinarySchema> res = new ArrayList<>();
+
+        if (inline) {
+            if (schemaId1 != EMPTY)
+                res.add(schema1);
+            if (schemaId2 != EMPTY)
+                res.add(schema2);
+            if (schemaId3 != EMPTY)
+                res.add(schema3);
+            if (schemaId4 != EMPTY)
+                res.add(schema4);
+        }
+        else
+            res.addAll(schemas.values());
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 553d8e5..77dce56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -958,10 +958,30 @@ public class BinaryUtils {
      * @throws BinaryObjectException If merge failed due to metadata conflict.
      */
     public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta) {
+        return mergeMetadata(oldMeta, newMeta, null);
+    }
+
+    /**
+     * Merge old and new metas.
+     *
+     * @param oldMeta Old meta.
+     * @param newMeta New meta.
+     * @param changedSchemas Set for holding changed schemas.
+     * @return New meta if old meta was null, old meta if no changes detected, merged meta otherwise.
+     * @throws BinaryObjectException If merge failed due to metadata conflict.
+     */
+    public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta,
+        @Nullable Set<Integer> changedSchemas) {
         assert newMeta != null;
 
-        if (oldMeta == null)
+        if (oldMeta == null) {
+            if (changedSchemas != null) {
+                for (BinarySchema schema : newMeta.schemas())
+                    changedSchemas.add(schema.schemaId());
+            }
+
             return newMeta;
+        }
         else {
             assert oldMeta.typeId() == newMeta.typeId();
 
@@ -1036,8 +1056,12 @@ public class BinaryUtils {
             Collection<BinarySchema> mergedSchemas = new HashSet<>(oldMeta.schemas());
 
             for (BinarySchema newSchema : newMeta.schemas()) {
-                if (mergedSchemas.add(newSchema))
+                if (mergedSchemas.add(newSchema)) {
                     changed = true;
+
+                    if (changedSchemas != null)
+                        changedSchemas.add(newSchema.schemaId());
+                }
             }
 
             // Return either old meta if no changes detected, or new merged meta.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 38450df..1c2f6f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -16,8 +16,12 @@
  */
 package org.apache.ignite.internal.processors.cache.binary;
 
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +46,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
@@ -87,6 +92,9 @@ final class BinaryMetadataTransport {
     private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap<>();
 
     /** */
+    private final ConcurrentMap<SyncKey, GridFutureAdapter<?>> schemaWaitFuts = new ConcurrentHashMap<>();
+
+    /** */
     private volatile boolean stopping;
 
     /** */
@@ -207,6 +215,21 @@ final class BinaryMetadataTransport {
     }
 
     /**
+     * Await specific schema update.
+     * @param typeId Type id.
+     * @param schemaId Schema id.
+     * @return Future which will be completed when schema is received.
+     */
+    GridFutureAdapter<?> awaitSchemaUpdate(int typeId, int schemaId) {
+        GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+
+        // Use version for schemaId.
+        GridFutureAdapter<?> oldFut = schemaWaitFuts.putIfAbsent(new SyncKey(typeId, schemaId), fut);
+
+        return oldFut == null ? fut : oldFut;
+    }
+
+    /**
      * Allows client node to request latest version of binary metadata for a given typeId from the cluster
      * in case client is able to detect that it has obsolete metadata in its local cache.
      *
@@ -259,6 +282,13 @@ final class BinaryMetadataTransport {
 
         /** {@inheritDoc} */
         @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+            if (log.isDebugEnabled())
+                log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() +
+                    ", typeName=" + msg.metadata().typeName() +
+                    ", pendingVer=" + msg.pendingVersion() +
+                    ", acceptedVer=" + msg.acceptedVersion() +
+                    ", schemasCnt=" + msg.metadata().schemas().size() + ']');
+
             int typeId = msg.typeId();
 
             BinaryMetadataHolder holder = metaLocCache.get(typeId);
@@ -277,20 +307,23 @@ final class BinaryMetadataTransport {
                     acceptedVer = 0;
                 }
 
-                if (log.isDebugEnabled())
-                    log.debug("Versions are stamped on coordinator" +
-                        " [typeId=" + typeId +
-                        ", pendingVer=" + pendingVer +
-                        ", acceptedVer=" + acceptedVer + "]"
-                    );
-
                 msg.pendingVersion(pendingVer);
                 msg.acceptedVersion(acceptedVer);
 
                 BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
 
                 try {
-                    BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+                    Set<Integer> changedSchemas = new LinkedHashSet<>();
+
+                    BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Versions are stamped on coordinator" +
+                                " [typeId=" + typeId +
+                                ", changedSchemas=" + changedSchemas +
+                                ", pendingVer=" + pendingVer +
+                                ", acceptedVer=" + acceptedVer + "]"
+                        );
 
                     msg.metadata(mergedMeta);
                 }
@@ -358,8 +391,10 @@ final class BinaryMetadataTransport {
                 if (!msg.rejected()) {
                     BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
 
+                    Set<Integer> changedSchemas = new LinkedHashSet<>();
+
                     try {
-                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
 
                         BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
 
@@ -382,7 +417,8 @@ final class BinaryMetadataTransport {
                         }
                         else {
                             if (log.isDebugEnabled())
-                                log.debug("Updated metadata on server node: " + newHolder);
+                                log.debug("Updated metadata on server node [holder=" + newHolder +
+                                    ", changedSchemas=" + changedSchemas + ']');
 
                             metaLocCache.put(typeId, newHolder);
                         }
@@ -463,7 +499,7 @@ final class BinaryMetadataTransport {
                 if (oldAcceptedVer >= newAcceptedVer) {
                     if (log.isDebugEnabled())
                         log.debug("Marking ack as duplicate [holder=" + holder +
-                            ", newAcceptedVer: " + newAcceptedVer + ']');
+                            ", newAcceptedVer=" + newAcceptedVer + ']');
 
                     //this is duplicate ack
                     msg.duplicated(true);
@@ -481,8 +517,26 @@ final class BinaryMetadataTransport {
 
             GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer));
 
+            holder = metaLocCache.get(typeId);
+
             if (log.isDebugEnabled())
-                log.debug("Completing future " + fut + " for " + metaLocCache.get(typeId));
+                log.debug("Completing future " + fut + " for " + holder);
+
+            if (!schemaWaitFuts.isEmpty()) {
+                Iterator<Map.Entry<SyncKey, GridFutureAdapter<?>>> iter = schemaWaitFuts.entrySet().iterator();
+
+                while (iter.hasNext()) {
+                    Map.Entry<SyncKey, GridFutureAdapter<?>> entry = iter.next();
+
+                    SyncKey key = entry.getKey();
+
+                    if (key.typeId() == typeId && holder.metadata().hasSchema(key.version())) {
+                        entry.getValue().onDone();
+
+                        iter.remove();
+                    }
+                }
+            }
 
             if (fut != null)
                 fut.onDone(MetadataUpdateResult.createSuccessfulResult());
@@ -527,6 +581,11 @@ final class BinaryMetadataTransport {
         void key(SyncKey key) {
             this.key = key;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MetadataUpdateResultFuture.class, this);
+        }
     }
 
     /**
@@ -580,6 +639,11 @@ final class BinaryMetadataTransport {
 
             return (typeId == that.typeId) && (ver == that.ver);
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SyncKey.class, this);
+        }
     }
 
     /**
@@ -615,7 +679,7 @@ final class BinaryMetadataTransport {
                     binMetaBytes = U.marshal(ctx, metaHolder);
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to marshal binary metadata for [typeId: " + typeId + "]", e);
+                    U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e);
 
                     resp.markErrorOnRequest();
                 }
@@ -670,7 +734,8 @@ final class BinaryMetadataTransport {
                     do {
                         oldHolder = metaLocCache.get(typeId);
 
-                        if (oldHolder != null && obsoleteUpdate(
+                        // typeId metadata cannot be removed after initialization.
+                        if (obsoleteUpdate(
                                 oldHolder.pendingVersion(),
                                 oldHolder.acceptedVersion(),
                                 newHolder.pendingVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 4c101b2..137db9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -19,10 +19,14 @@ package org.apache.ignite.internal.processors.cache.binary;
 
 import java.io.File;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,6 +34,8 @@ import javax.cache.CacheException;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryField;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -39,8 +45,10 @@ import org.apache.ignite.binary.BinaryTypeConfiguration;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.UnregisteredBinaryTypeException;
 import org.apache.ignite.internal.binary.BinaryContext;
@@ -65,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -76,7 +85,9 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T1;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
@@ -88,7 +99,11 @@ import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TEST_FEATURES_ENABLED;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
@@ -120,6 +135,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
      */
     @Nullable private File binaryMetadataFileStoreDir;
 
+    /** How long to wait for schema if no updates in progress. */
+    private long waitSchemaTimeout = IgniteSystemProperties.getLong(IGNITE_WAIT_SCHEMA_UPDATE, 30_000);
+
+    /** For tests. */
+    public static boolean useTestBinaryCtx = false;
+
     /** */
     @GridToStringExclude
     private IgniteBinary binaries;
@@ -205,7 +226,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
             BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;
 
-            binaryCtx = new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));
+            binaryCtx = useTestBinaryCtx ?
+                new TestBinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)) :
+                new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));
 
             IgniteUtils.invoke(BinaryMarshaller.class, bMarsh0, "setBinaryContext", binaryCtx, ctx.config());
 
@@ -452,11 +475,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
             BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
 
-            BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+            Set<Integer> changedSchemas = new LinkedHashSet<>();
+
+            BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas);
 
-            //metadata requested to be added is exactly the same as already presented in the cache
-            if (mergedMeta == oldMeta)
-                return;
+            if (oldMeta != null && mergedMeta == oldMeta && metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+                return; // Safe to use existing schemas.
 
             if (failIfUnregistered)
                 throw new UnregisteredBinaryTypeException(
@@ -466,7 +490,24 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
                         "dev-list.",
                     typeId, mergedMeta);
 
-            MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
+            long t0 = System.nanoTime();
+
+            GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(mergedMeta);
+
+            MetadataUpdateResult res = fut.get();
+
+            if (log.isDebugEnabled()) {
+                IgniteInternalTx tx = ctx.cache().context().tm().tx();
+
+                log.debug("Completed metadata update [typeId=" + typeId +
+                    ", typeName=" + newMeta.typeName() +
+                    ", changedSchemas=" + changedSchemas +
+                    ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" +
+                    ", holder=" + metaHolder +
+                    ", fut=" + fut +
+                    ", tx=" + CU.txString(tx) +
+                    ']');
+            }
 
             assert res != null;
 
@@ -541,9 +582,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
                 if (log.isDebugEnabled() && !fut.isDone())
                     log.debug("Waiting for update for" +
-                            " [typeId=" + typeId +
-                            ", pendingVer=" + holder.pendingVersion() +
-                            ", acceptedVer=" + holder.acceptedVersion() + "]");
+                        " [typeId=" + typeId +
+                        ", pendingVer=" + holder.pendingVersion() +
+                        ", acceptedVer=" + holder.acceptedVersion() + "]");
 
                 try {
                     fut.get();
@@ -565,40 +606,99 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         if (ctx.clientNode()) {
             if (holder == null || !holder.metadata().hasSchema(schemaId)) {
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for client metadata update" +
+                        " [typeId=" + typeId
+                        + ", schemaId=" + schemaId
+                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+                        + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']');
+
                 try {
                     transport.requestUpToDateMetadata(typeId).get();
-
-                    holder = metadataLocCache.get(typeId);
                 }
                 catch (IgniteCheckedException ignored) {
                     // No-op.
                 }
+
+                holder = metadataLocCache.get(typeId);
+
+                if (log.isDebugEnabled())
+                    log.debug("Finished waiting for client metadata update" +
+                        " [typeId=" + typeId
+                        + ", schemaId=" + schemaId
+                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+                        + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']');
             }
         }
-        else if (holder != null) {
-            if (IgniteThread.current() instanceof IgniteDiscoveryThread)
+        else {
+            if (holder != null && IgniteThread.current() instanceof IgniteDiscoveryThread)
                 return holder.metadata().wrap(binaryCtx);
+            else if (holder != null && (holder.pendingVersion() - holder.acceptedVersion() > 0)) {
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for metadata update" +
+                        " [typeId=" + typeId
+                        + ", schemaId=" + schemaId
+                        + ", pendingVer=" + holder.pendingVersion()
+                        + ", acceptedVer=" + holder.acceptedVersion() + ']');
 
-            if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
-                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
-                        typeId,
-                        holder.pendingVersion());
+                long t0 = System.nanoTime();
 
-                if (log.isDebugEnabled() && !fut.isDone())
-                    log.debug("Waiting for update for" +
-                            " [typeId=" + typeId
-                            + ", schemaId=" + schemaId
-                            + ", pendingVer=" + holder.pendingVersion()
-                            + ", acceptedVer=" + holder.acceptedVersion() + "]");
+                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
+                    typeId,
+                    holder.pendingVersion());
 
                 try {
                     fut.get();
                 }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to wait for metadata update [typeId=" + typeId + ", schemaId=" + schemaId + ']', e);
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Finished waiting for metadata update" +
+                        " [typeId=" + typeId
+                        + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
+                        + ", schemaId=" + schemaId
+                        + ", pendingVer=" + holder.pendingVersion()
+                        + ", acceptedVer=" + holder.acceptedVersion() + ']');
+
+                holder = metadataLocCache.get(typeId);
+            }
+            else if (holder == null || !holder.metadata().hasSchema(schemaId)) {
+                // Last resort waiting.
+                U.warn(log,
+                    "Schema is missing while no metadata updates are in progress " +
+                        "(will wait for schema update within timeout defined by IGNITE_BINARY_META_UPDATE_TIMEOUT system property)" +
+                        " [typeId=" + typeId
+                        + ", missingSchemaId=" + schemaId
+                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+                        + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion())
+                        + ", binMetaUpdateTimeout=" + waitSchemaTimeout +']');
+
+                long t0 = System.nanoTime();
+
+                GridFutureAdapter<?> fut = transport.awaitSchemaUpdate(typeId, schemaId);
+
+                try {
+                    fut.get(waitSchemaTimeout);
+                }
+                catch (IgniteFutureTimeoutCheckedException e) {
+                    log.error("Timed out while waiting for schema update [typeId=" + typeId + ", schemaId=" +
+                        schemaId + ']');
+                }
                 catch (IgniteCheckedException ignored) {
                     // No-op.
                 }
 
                 holder = metadataLocCache.get(typeId);
+
+                if (log.isDebugEnabled() && holder != null && holder.metadata().hasSchema(schemaId))
+                    log.debug("Found the schema after wait" +
+                        " [typeId=" + typeId
+                        + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
+                        + ", schemaId=" + schemaId
+                        + ", pendingVer=" + holder.pendingVersion()
+                        + ", acceptedVer=" + holder.acceptedVersion() + ']');
             }
         }
 
@@ -903,7 +1003,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         if ((res = validateBinaryConfiguration(rmtNode)) != null)
             return res;
 
-        return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>) discoData.joiningNodeData());
+        return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>)discoData.joiningNodeData());
     }
 
     /** */
@@ -1070,4 +1170,75 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) {
         this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
     }
+
+    /** */
+    public static class TestBinaryContext extends BinaryContext {
+        /** */
+        private List<TestBinaryContextListener> listeners;
+
+        /**
+         * @param metaHnd Meta handler.
+         * @param igniteCfg Ignite config.
+         * @param log Logger.
+         */
+        public TestBinaryContext(BinaryMetadataHandler metaHnd, IgniteConfiguration igniteCfg,
+            IgniteLogger log) {
+            super(metaHnd, igniteCfg, log);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
+            BinaryType metadata = super.metadata(typeId);
+
+            if (listeners != null) {
+                for (TestBinaryContextListener listener : listeners)
+                    listener.onAfterMetadataRequest(typeId, metadata);
+            }
+
+            return metadata;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void updateMetadata(int typeId, BinaryMetadata meta,
+            boolean failIfUnregistered) throws BinaryObjectException {
+            if (listeners != null) {
+                for (TestBinaryContextListener listener : listeners)
+                    listener.onBeforeMetadataUpdate(typeId, meta);
+            }
+
+            super.updateMetadata(typeId, meta, failIfUnregistered);
+        }
+
+        /** */
+        public interface TestBinaryContextListener {
+            /**
+             * @param typeId Type id.
+             * @param type Type.
+             */
+            void onAfterMetadataRequest(int typeId, BinaryType type);
+
+            /**
+             * @param typeId Type id.
+             * @param metadata Metadata.
+             */
+            void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata);
+        }
+
+        /**
+         * @param lsnr Listener.
+         */
+        public void addListener(TestBinaryContextListener lsnr) {
+            if (listeners == null)
+                listeners = new ArrayList<>();
+
+            if (!listeners.contains(lsnr))
+                listeners.add(lsnr);
+        }
+
+        /** */
+        public void clearAllListener() {
+            if (listeners != null)
+                listeners.clear();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 048abf6..55462ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -423,6 +423,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
     /** */
     private IgniteDiscoverySpiInternalListener internalLsnr;
 
+    /** For test purposes. */
+    private boolean skipAddrsRandomization = false;
+
     /**
      * Gets current SPI state.
      *
@@ -1881,7 +1884,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
             }
         }
 
-        if (!res.isEmpty())
+        if (!res.isEmpty() && !skipAddrsRandomization)
             Collections.shuffle(res);
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
index 84fd916..41c5882 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
@@ -100,6 +100,8 @@ public class CachePageWriteLockUnlockTest extends GridCommonAbstractTest {
 
             grid0 = startGrid(0);
 
+            grid0.cluster().active(true);
+
             preloadPartition(grid0, DEFAULT_CACHE_NAME, PARTITION);
 
             Iterator<Cache.Entry<Object, Object>> it = grid0.cache(DEFAULT_CACHE_NAME).iterator();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index ccf4c8a..ae75caa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -819,7 +819,12 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
             tx.commit();
         }
         catch (Throwable t) {
-            assertTrue(X.hasCause(t, TransactionTimeoutException.class));
+            boolean timedOut = X.hasCause(t, TransactionTimeoutException.class);
+
+            if (!timedOut)
+                log.error("Got unexpected exception", t);
+
+            assertTrue(timedOut);
         }
 
         assertEquals(0, client.cache(CACHE_NAME).size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
new file mode 100644
index 0000000..fed1d7f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests scenario for too early metadata update completion in case of multiple concurrent updates for the same schema.
+ * <p>
+ * Scenario is the following:
+ *
+ * <ul>
+ *     <li>Start 4 nodes, connect client to node 2 in topology order (starting from 1).</li>
+ *     <li>Start two concurrent transactions from client node producing same schema update.</li>
+ *     <li>Delay second update until first update will return to client with stamped propose message and writes new
+ *     schema to local metadata cache</li>
+ *     <li>Unblock second update. It should correctly wait until the metadata is applied on all
+ *     nodes or tx will fail on commit.</li>
+ * </ul>
+ */
+public class BinaryMetadataConcurrentUpdateWithIndexesTest extends GridCommonAbstractTest {
+    /** */
+    private static final int FIELDS = 2;
+
+    /** */
+    private static final int MB = 1024 * 1024;
+
+    /** */
+    private static final TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        cfg.setIncludeEventTypes(EventType.EVTS_DISCOVERY);
+
+        BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
+
+        Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization");
+
+        assertNotNull(rndAddrsField);
+
+        rndAddrsField.set(spi, true);
+
+        cfg.setDiscoverySpi(spi.setIpFinder(ipFinder));
+
+        cfg.setClientMode(igniteInstanceName.startsWith("client"));
+
+        QueryEntity qryEntity = new QueryEntity("java.lang.Integer", "Value");
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+        Collection<QueryIndex> indexes = new ArrayList<>(FIELDS);
+
+        for (int i = 0; i < FIELDS; i++) {
+            String name = "s" + i;
+
+            fields.put(name, "java.lang.String");
+
+            indexes.add(new QueryIndex(name, QueryIndexType.SORTED));
+        }
+
+        qryEntity.setFields(fields);
+
+        qryEntity.setIndexes(indexes);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration().
+            setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(50 * MB)));
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+            setBackups(0).
+            setQueryEntities(Collections.singleton(qryEntity)).
+            setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
+            setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).
+            setCacheMode(CacheMode.PARTITIONED));
+
+        return cfg;
+    }
+
+    /** Flag to start syncing metadata requests. Should skip on exchange. */
+    private volatile boolean syncMeta;
+
+    /** Metadata init latch. Both threads must request initial metadata. */
+    private CountDownLatch initMetaReq = new CountDownLatch(2);
+
+    /** Thread local flag for need of waiting local metadata update. */
+    private ThreadLocal<Boolean> delayMetadataUpdateThreadLoc = new ThreadLocal<>();
+
+    /** Latch for waiting local metadata update. */
+    public static final CountDownLatch localMetaUpdatedLatch = new CountDownLatch(1);
+
+    /** */
+    public void testMissingSchemaUpdate() throws Exception {
+        // Start order is important.
+        Ignite node0 = startGrid("node0");
+
+        Ignite node1 = startGrid("node1");
+
+        IgniteEx client0 = startGrid("client0");
+
+        CacheObjectBinaryProcessorImpl.TestBinaryContext clientCtx =
+            (CacheObjectBinaryProcessorImpl.TestBinaryContext)((CacheObjectBinaryProcessorImpl)client0.context().
+                cacheObjects()).binaryContext();
+
+        clientCtx.addListener(new CacheObjectBinaryProcessorImpl.TestBinaryContext.TestBinaryContextListener() {
+            @Override public void onAfterMetadataRequest(int typeId, BinaryType type) {
+                if (syncMeta) {
+                    try {
+                        initMetaReq.countDown();
+
+                        initMetaReq.await();
+                    }
+                    catch (Exception e) {
+                        throw new BinaryObjectException(e);
+                    }
+                }
+            }
+
+            @Override public void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata) {
+                // Delay one of updates until schema is locally updated on propose message.
+                if (delayMetadataUpdateThreadLoc.get() != null)
+                    await(localMetaUpdatedLatch, 5000);
+            }
+        });
+
+        Ignite node2 = startGrid("node2");
+
+        Ignite node3 = startGrid("node3");
+
+        startGrid("node4");
+
+        node0.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        syncMeta = true;
+
+        CountDownLatch clientProposeMsgBlockedLatch = new CountDownLatch(1);
+
+        AtomicBoolean clientWait = new AtomicBoolean();
+        final Object clientMux = new Object();
+
+        AtomicBoolean srvWait = new AtomicBoolean();
+        final Object srvMux = new Object();
+
+        ((BlockTcpDiscoverySpi)node1.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
+            if (msg instanceof MetadataUpdateProposedMessage) {
+                if (Thread.currentThread().getName().contains("client")) {
+                    log.info("Block custom message to client0: [locNode=" + snd + ", msg=" + msg + ']');
+
+                    clientProposeMsgBlockedLatch.countDown();
+
+                    // Message to client
+                    synchronized (clientMux) {
+                        while (!clientWait.get())
+                            try {
+                                clientMux.wait();
+                            }
+                            catch (InterruptedException e) {
+                                fail();
+                            }
+                    }
+                }
+            }
+
+            return null;
+        });
+
+        ((BlockTcpDiscoverySpi)node2.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
+            if (msg instanceof MetadataUpdateProposedMessage) {
+                MetadataUpdateProposedMessage msg0 = (MetadataUpdateProposedMessage)msg;
+
+                int pendingVer = U.field(msg0, "pendingVer");
+
+                // Should not block propose messages until they reach coordinator.
+                if (pendingVer == 0)
+                    return null;
+
+                log.info("Block custom message to next server: [locNode=" + snd + ", msg=" + msg + ']');
+
+                // Message to client
+                synchronized (srvMux) {
+                    while (!srvWait.get())
+                        try {
+                            srvMux.wait();
+                        }
+                        catch (InterruptedException e) {
+                            fail();
+                        }
+                }
+            }
+
+            return null;
+        });
+
+        Integer key = primaryKey(node3.cache(DEFAULT_CACHE_NAME));
+
+        IgniteInternalFuture fut0 = runAsync(() -> {
+            try (Transaction tx = client0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
+
+                tx.commit();
+            }
+            catch (Throwable t) {
+                log.error("err", t);
+            }
+
+        });
+
+        // Implements test logic.
+        IgniteInternalFuture fut1 = runAsync(() -> {
+            // Wait for initial metadata received. It should be initial version: pending=0, accepted=0
+            await(initMetaReq, 5000);
+
+            // Wait for blocking proposal message to client node.
+            await(clientProposeMsgBlockedLatch, 5000);
+
+            // Unblock proposal message to client.
+            clientWait.set(true);
+
+            synchronized (clientMux) {
+                clientMux.notify();
+            }
+
+            // Give some time to apply update.
+            doSleep(3000);
+
+            // Unblock second metadata update.
+            localMetaUpdatedLatch.countDown();
+
+            // Give some time for tx to complete (success or fail). fut2 will throw an error if tx has failed on commit.
+            doSleep(3000);
+
+            // Unblock metadata message and allow for correct version acceptance.
+            srvWait.set(true);
+
+            synchronized (srvMux) {
+                srvMux.notify();
+            }
+        });
+
+        IgniteInternalFuture fut2 = runAsync(() -> {
+            delayMetadataUpdateThreadLoc.set(true);
+
+            try (Transaction tx = client0.transactions().
+                txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
+                client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
+
+                tx.commit();
+            }
+        });
+
+        fut0.get();
+        fut1.get();
+        fut2.get();
+    }
+
+    /**
+     * @param latch Latch.
+     * @param timeout Timeout.
+     */
+    private void await(CountDownLatch latch, long timeout) {
+        try {
+            latch.await(5000, MILLISECONDS);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        long cnt = initMetaReq.getCount();
+
+        if (cnt != 0)
+            throw new RuntimeException("Invalid latch count after wait: " + cnt);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param prefix Value prefix.
+     * @param fields Fields.
+     */
+    protected BinaryObject build(Ignite ignite, String prefix, int... fields) {
+        BinaryObjectBuilder builder = ignite.binary().builder("Value");
+
+        for (int field : fields) {
+            assertTrue(field < FIELDS);
+
+            builder.setField("i" + field, field);
+            builder.setField("s" + field, prefix + field);
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Discovery SPI which can simulate network split.
+     */
+    protected class BlockTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** Closure. */
+        private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo;
+
+        /**
+         * @param clo Closure.
+         */
+        public void setClosure(IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo) {
+            this.clo = clo;
+        }
+
+        /**
+         * @param addr Address.
+         * @param msg Message.
+         */
+        private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage msg) {
+            if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+                return;
+
+            TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg;
+
+            DiscoveryCustomMessage delegate;
+
+            try {
+                DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration()));
+
+                assertNotNull(custMsg);
+
+                delegate = ((CustomMessageWrapper)custMsg).delegate();
+
+            }
+            catch (Throwable throwable) {
+                throw new RuntimeException(throwable);
+            }
+
+            if (clo != null)
+                clo.apply(addr, delegate);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(
+            Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            byte[] data,
+            long timeout
+        ) throws IOException {
+            if (spiCtx != null)
+                apply(spiCtx.localNode(), msg);
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (spiCtx != null)
+                apply(spiCtx.localNode(), msg);
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CacheObjectBinaryProcessorImpl.useTestBinaryCtx = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        CacheObjectBinaryProcessorImpl.useTestBinaryCtx = false;
+
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index b44ff2d..8a60c7d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.BinaryMetadataConcurrentUpdateWithIndexesTest;
 import org.apache.ignite.internal.processors.cache.BinarySerializationQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.BinarySerializationQueryWithReflectiveSerializerSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanSelfTest;
@@ -42,6 +43,8 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheBinaryObjectsScanWithEventsSelfTest.class);
         suite.addTestSuite(BigEntryQueryTest.class);
 
+        suite.addTestSuite(BinaryMetadataConcurrentUpdateWithIndexesTest.class);
+
         //Should be adjusted. Not ready to be used with BinaryMarshaller.
         //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);
 


Mime
View raw message