atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject incubator-atlas git commit: ATLAS-1472: updated type-registry to handle simultaneous updates from multiple threads
Date Sun, 29 Jan 2017 02:21:15 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 57f4f79d6 -> 4b8b9e22f


ATLAS-1472: updated type-registry to handle simultaneous updates from multiple threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/4b8b9e22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/4b8b9e22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/4b8b9e22

Branch: refs/heads/master
Commit: 4b8b9e22f8e279f739899bc50e20ffdae6142586
Parents: 57f4f79
Author: Madhan Neethiraj <madhan@apache.org>
Authored: Fri Jan 27 03:22:26 2017 -0800
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Sat Jan 28 17:47:43 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasErrorCode.java   |   1 +
 .../apache/atlas/type/AtlasTypeRegistry.java    | 117 ++++++++-
 .../org/apache/atlas/type/AtlasTypeUtil.java    |  12 +-
 .../org/apache/atlas/model/ModelTestUtil.java   |  36 ++-
 .../apache/atlas/type/TestAtlasEntityType.java  |  51 ++--
 .../atlas/type/TestAtlasTypeRegistry.java       | 185 ++++++++++++++-
 .../store/graph/AtlasTypeDefGraphStore.java     | 236 ++++++-------------
 .../graph/v1/AtlasTypeDefGraphStoreV1.java      |   8 +-
 .../util/AtlasRepositoryConfiguration.java      |  21 ++
 9 files changed, 450 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index e7dbb1c..0fb16c6 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -76,6 +76,7 @@ public enum AtlasErrorCode {
     INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"),
     INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
     INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
+    FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
 
     INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(400, "ATLAS40018E", "Instance {0} with unique attribute {1} does not exist"),
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 3de0215..3f3ea59 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -38,6 +38,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
@@ -51,15 +53,20 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUF
 @Singleton
 public class AtlasTypeRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasStructType.class);
+    private static final int    DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS = 15;
 
-    protected RegistryData registryData;
+    protected       RegistryData                   registryData;
+    private   final TypeRegistryUpdateSynchronizer updateSynchronizer;
 
     public AtlasTypeRegistry() {
-        registryData = new RegistryData();
+        registryData       = new RegistryData();
+        updateSynchronizer = new TypeRegistryUpdateSynchronizer(this);
     }
 
+    // used only by AtlasTransientTypeRegistry
     protected AtlasTypeRegistry(AtlasTypeRegistry other) {
-        registryData = new RegistryData(other.registryData);
+        registryData       = new RegistryData(other.registryData);
+        updateSynchronizer = other.updateSynchronizer;
     }
 
     public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); }
@@ -195,14 +202,19 @@ public class AtlasTypeRegistry {
     public AtlasEntityType getEntityTypeByName(String name) { return registryData.entityDefs.getTypeByName(name); }
 
 
-    public AtlasTransientTypeRegistry createTransientTypeRegistry() {
-        return new AtlasTransientTypeRegistry(this);
+    public AtlasTransientTypeRegistry lockTypeRegistryForUpdate() throws AtlasBaseException {
+        return lockTypeRegistryForUpdate(DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS);
     }
 
-    public void commitTransientTypeRegistry(AtlasTransientTypeRegistry transientTypeRegistry) {
-        this.registryData = transientTypeRegistry.registryData;
+    public AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
+        return updateSynchronizer.lockTypeRegistryForUpdate(lockMaxWaitTimeInSeconds);
     }
 
+    public void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry transientTypeRegistry, boolean commitUpdates) {
+        updateSynchronizer.releaseTypeRegistryForUpdate(transientTypeRegistry, commitUpdates);
+    }
+
+
     static class RegistryData {
         final TypeCache                                                       allTypes;
         final TypeDefCache<AtlasEnumDef, AtlasEnumType>                       enumDefs;
@@ -519,12 +531,16 @@ public class AtlasTypeRegistry {
         public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; }
 
 
-        private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) {
+        private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) throws AtlasBaseException{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("==> AtlasTypeRegistry.addTypeWithNoRefResolve({})", typeDef);
             }
 
             if (typeDef != null) {
+                if (this.isRegisteredType(typeDef.getName())) {
+                    throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, typeDef.getName());
+                }
+
                 if (typeDef.getClass().equals(AtlasEnumDef.class)) {
                     AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
 
@@ -552,7 +568,7 @@ public class AtlasTypeRegistry {
             }
         }
 
-        private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) {
+        private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) throws AtlasBaseException {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("==> AtlasTypeRegistry.addTypesWithNoRefResolve(length={})",
                           (typeDefs == null ? 0 : typeDefs.size()));
@@ -681,6 +697,89 @@ public class AtlasTypeRegistry {
             }
         }
     }
+
+    static class TypeRegistryUpdateSynchronizer {
+        private final AtlasTypeRegistry typeRegistry;
+        private final ReentrantLock     typeRegistryUpdateLock;
+        private AtlasTransientTypeRegistry typeRegistryUnderUpdate = null;
+        private String                     lockedByThread          = null;
+
+        TypeRegistryUpdateSynchronizer(AtlasTypeRegistry typeRegistry) {
+            this.typeRegistry           = typeRegistry;
+            this.typeRegistryUpdateLock = new ReentrantLock();
+        }
+
+        AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
+            LOG.debug("==> lockTypeRegistryForUpdate()");
+
+            boolean alreadyLockedByCurrentThread = typeRegistryUpdateLock.isHeldByCurrentThread();
+
+            if (!alreadyLockedByCurrentThread) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread);
+                }
+            } else {
+                LOG.warn("lockTypeRegistryForUpdate(): already locked. currentLockCount={}",
+                         typeRegistryUpdateLock.getHoldCount());
+            }
+
+            try {
+                boolean isLocked = typeRegistryUpdateLock.tryLock(lockMaxWaitTimeInSeconds, TimeUnit.SECONDS);
+
+                if (!isLocked) {
+                    throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK);
+                }
+            } catch (InterruptedException excp) {
+                throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK, excp);
+            }
+
+            if (!alreadyLockedByCurrentThread) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("lockTypeRegistryForUpdate(): wait over..got the lock");
+                }
+
+                typeRegistryUnderUpdate = new AtlasTransientTypeRegistry(typeRegistry);
+                lockedByThread          = Thread.currentThread().getName();
+            }
+
+            LOG.debug("<== lockTypeRegistryForUpdate()");
+
+            return typeRegistryUnderUpdate;
+        }
+
+        void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry ttr, boolean commitUpdates) {
+            LOG.debug("==> releaseTypeRegistryForUpdate()");
+
+            if (typeRegistryUpdateLock.isHeldByCurrentThread()) {
+                try {
+                    if (typeRegistryUnderUpdate != ttr) {
+                        LOG.error("releaseTypeRegistryForUpdate(): incorrect typeRegistry returned for release" +
+                                  ": found=" + ttr + "; expected=" + typeRegistryUnderUpdate,
+                                  new Exception().fillInStackTrace());
+                    } else if (typeRegistryUpdateLock.getHoldCount() == 1) {
+                        if (ttr != null && commitUpdates) {
+                            typeRegistry.registryData = ttr.registryData;
+                        }
+                    }
+
+                    if (typeRegistryUpdateLock.getHoldCount() == 1) {
+                        lockedByThread          = null;
+                        typeRegistryUnderUpdate = null;
+                    } else {
+                        LOG.warn("releaseTypeRegistryForUpdate(): pendingReleaseCount={}", typeRegistryUpdateLock.getHoldCount() - 1);
+                    }
+                } finally {
+                    typeRegistryUpdateLock.unlock();
+                }
+            } else {
+                LOG.error("releaseTypeRegistryForUpdate(): current thread does not hold the lock",
+                          new Exception().fillInStackTrace());
+            }
+
+            LOG.debug("<== releaseTypeRegistryForUpdate()");
+        }
+
+    }
 }
 
 class TypeCache {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 089bebee..e4f1eea 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas.type;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.atlas.AtlasErrorCode;
@@ -36,7 +35,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -52,21 +50,19 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_S
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP;
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX;
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX;
-import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_REF_ATTRIBUTE;
-import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_MAPPED_FROM_REF;
 
 /**
  * Utility methods for AtlasType/AtlasTypeDef.
  */
 public class AtlasTypeUtil {
     private static final Set<String> ATLAS_BUILTIN_TYPENAMES = new HashSet<>();
-    private static final String  NAME_REGEX   = "[a-zA-Z][a-zA-Z0-9_ ]*";
+    private static final String  NAME_REGEX         = "[a-zA-Z][a-zA-Z0-9_ ]*";
     private static final String  TRAIT_NAME_REGEX   = "[a-zA-Z][a-zA-Z0-9_ .]*";
-    private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX);
+    private static final Pattern NAME_PATTERN       = Pattern.compile(NAME_REGEX);
     private static final Pattern TRAIT_NAME_PATTERN = Pattern.compile(TRAIT_NAME_REGEX);
 
-    private static final String InvalidTypeNameErrorMessage = "Names must consist of a letter followed by a sequence of letter, number, or '_' characters.";
-    private static final String InvalidTraitTypeNameErrorMessage = "Names must consist of a leter followed by a sequence of letters,  numbers, '.', or '_' characters.";
+    private static final String InvalidTypeNameErrorMessage      = "Name must consist of a letter followed by a sequence of [ letter, number, '_' ] characters.";
+    private static final String InvalidTraitTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter,  number, '_', '.' ] characters.";
 
     static {
         Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
index 6d3c312..5c72470 100644
--- a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
+++ b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
@@ -158,16 +158,21 @@ public final class  ModelTestUtil {
             ret.setDefaultValue(ret.getElementDefs().get(idxDefault).getValue());
         }
 
+        AtlasTransientTypeRegistry ttr    = null;
+        boolean                    commit = false;
+
         try {
-            AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+            ttr = typesRegistry.lockTypeRegistryForUpdate();
 
             ttr.addType(ret);
 
-            typesRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             LOG.error("failed to create enum-def", excp);
 
             ret = null;
+        } finally {
+            typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
 
         return ret;
@@ -186,16 +191,21 @@ public final class  ModelTestUtil {
         ret.setDescription(ret.getName());
         ret.setAttributeDefs(newAttributeDefsWithAllBuiltInTypes(PREFIX_ATTRIBUTE_NAME));
 
+        AtlasTransientTypeRegistry ttr    = null;
+        boolean                    commit = false;
+
         try {
-            AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+            ttr = typesRegistry.lockTypeRegistryForUpdate();
 
             ttr.addType(ret);
 
-            typesRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             LOG.error("failed to create struct-def", excp);
 
             ret = null;
+        } finally {
+            typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
 
         return ret;
@@ -228,16 +238,21 @@ public final class  ModelTestUtil {
             }
         }
 
+        AtlasTransientTypeRegistry ttr    = null;
+        boolean                    commit = false;
+
         try {
-            AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+            ttr = typesRegistry.lockTypeRegistryForUpdate();
 
             ttr.addType(ret);
 
-            typesRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             LOG.error("failed to create entity-def", excp);
 
             ret = null;
+        } finally {
+            typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
 
         return ret;
@@ -279,16 +294,21 @@ public final class  ModelTestUtil {
             }
         }
 
+        AtlasTransientTypeRegistry ttr    = null;
+        boolean                    commit = false;
+
         try {
-            AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+            ttr = typesRegistry.lockTypeRegistryForUpdate();
 
             ttr.addType(ret);
 
-            typesRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             LOG.error("failed to create classification-def", excp);
 
             ret = null;
+        } finally {
+            typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
index 710840f..4e15edd 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
@@ -124,21 +124,25 @@ public class TestAtlasEntityType {
 
     @Test
     public void testForeignKeyConstraintValid() {
-        AtlasTypeRegistry    typeRegistry = new AtlasTypeRegistry();
-        List<AtlasEntityDef> entityDefs   = new ArrayList<>();
-        String               failureMsg   = null;
+        AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
+        List<AtlasEntityDef>       entityDefs   = new ArrayList<>();
+        String                     failureMsg   = null;
 
         entityDefs.add(createTableEntityDef());
         entityDefs.add(createColumnEntityDef());
 
         try {
-            AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
 
             ttr.addTypes(entityDefs);
 
-            typeRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNull(failureMsg, "failed to create types my_table and my_column");
     }
@@ -151,55 +155,68 @@ public class TestAtlasEntityType {
 
         entityDefs.add(createTableEntityDef());
 
+        AtlasTransientTypeRegistry ttr    = null;
+        boolean                    commit = false;
+
         try {
-            AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
 
             ttr.addTypes(entityDefs);
 
-            typeRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in mappedFromRef");
     }
 
     @Test
     public void testForeignKeyConstraintInValidMappedFromRef2() {
-        AtlasTypeRegistry    typeRegistry = new AtlasTypeRegistry();
-        List<AtlasEntityDef> entityDefs   = new ArrayList<>();
-        String               failureMsg   = null;
+        AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
+        List<AtlasEntityDef>       entityDefs   = new ArrayList<>();
+        String                     failureMsg   = null;
 
         entityDefs.add(createTableEntityDefWithMissingRefAttribute());
         entityDefs.add(createColumnEntityDef());
 
         try {
-            AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
 
             ttr.addTypes(entityDefs);
 
-            typeRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid constraint failure - missing refAttribute in mappedFromRef");
     }
 
     @Test
     public void testForeignKeyConstraintInValidForeignKey() {
-        AtlasTypeRegistry    typeRegistry = new AtlasTypeRegistry();
-        List<AtlasEntityDef> entityDefs   = new ArrayList<>();
-        String               failureMsg   = null;
+        AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
+        List<AtlasEntityDef>       entityDefs   = new ArrayList<>();
+        String                     failureMsg   = null;
 
         entityDefs.add(createColumnEntityDef());
 
         try {
-            AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
 
             ttr.addTypes(entityDefs);
 
-            typeRegistry.commitTransientTypeRegistry(ttr);
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in foreignKey");
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
index 60a09a1..f93a2e8 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
@@ -19,11 +19,8 @@ package org.apache.atlas.type;
 
 
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.model.typedef.AtlasClassificationDef;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.*;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
 import org.testng.annotations.Test;
 
@@ -31,6 +28,10 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.*;
 
@@ -82,17 +83,23 @@ public class TestAtlasTypeRegistry {
         typesDef.getClassificationDefs().add(classifiL2_4);
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addTypes(typesDef);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNull(failureMsg);
 
-        typeRegistry.commitTransientTypeRegistry(ttr);
 
         validateSuperTypes(typeRegistry, "L0", new HashSet<String>());
         validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0")));
@@ -126,13 +133,20 @@ public class TestAtlasTypeRegistry {
         classifiDef1.addSuperType(classifiDef1.getName());
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addType(classifiDef1);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid supertype failure");
     }
@@ -178,13 +192,20 @@ public class TestAtlasTypeRegistry {
         typesDef.getClassificationDefs().add(classifiL2_4);
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addTypes(typesDef);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid supertype failure");
     }
@@ -235,18 +256,23 @@ public class TestAtlasTypeRegistry {
         typesDef.getEntityDefs().add(entL2_4);
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addTypes(typesDef);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNull(failureMsg);
 
-        typeRegistry.commitTransientTypeRegistry(ttr);
-
         validateSuperTypes(typeRegistry, "L0", new HashSet<String>());
         validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0")));
         validateSuperTypes(typeRegistry, "L1-2", new HashSet<>(Arrays.asList("L0")));
@@ -279,13 +305,20 @@ public class TestAtlasTypeRegistry {
         entDef1.addSuperType(entDef1.getName());
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addType(entDef1);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid supertype failure");
     }
@@ -331,17 +364,143 @@ public class TestAtlasTypeRegistry {
         typesDef.getEntityDefs().add(entL2_4);
 
         AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
-        AtlasTransientTypeRegistry ttr          = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
         String                     failureMsg   = null;
 
         try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
             ttr.addTypes(typesDef);
+
+            commit = true;
         } catch (AtlasBaseException excp) {
             failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
         }
         assertNotNull(failureMsg, "expected invalid supertype failure");
     }
 
+    @Test
+    public void testNestedUpdates() {
+        AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
+        AtlasTransientTypeRegistry ttr          = null;
+        boolean                    commit       = false;
+        String                     failureMsg   = null;
+        AtlasClassificationDef     testTag1     = new AtlasClassificationDef("testTag1");
+        AtlasClassificationDef     testTag2     = new AtlasClassificationDef("testTag2");
+
+        try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
+            ttr.addType(testTag1);
+
+            // changes should not be seen in typeRegistry until lock is released
+            assertFalse(typeRegistry.isRegisteredType(testTag1.getName()),
+                        "type added should be seen in typeRegistry only after commit");
+
+            boolean isNestedUpdateSuccess = addType(typeRegistry, testTag2);
+
+            assertTrue(isNestedUpdateSuccess);
+
+            // changes made in nested commit, inside addType(), should not be seen in typeRegistry until lock is released here
+            assertFalse(typeRegistry.isRegisteredType(testTag2.getName()),
+                        "type added within nested commit should be seen in typeRegistry only after outer commit");
+
+            commit = true;
+        } catch (AtlasBaseException excp) {
+            failureMsg = excp.getMessage();
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
+        }
+        assertNull(failureMsg);
+        assertTrue(typeRegistry.isRegisteredType(testTag1.getName()));
+        assertTrue(typeRegistry.isRegisteredType(testTag2.getName()));
+    }
+
+    @Test
+    public void testParallelUpdates() {
+        final int    numOfThreads         =  3;
+        final int    numOfTypesPerKind    = 30;
+        final String enumTypePrefix       = "testEnum-";
+        final String structTypePrefix     = "testStruct-";
+        final String classificationPrefix = "testTag-";
+        final String entityTypePrefix     = "testEntity-";
+
+        ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
+
+        final AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+
+        // update typeRegistry simultaneously in multiple threads
+        for (int threadIdx = 0; threadIdx < numOfThreads; threadIdx++) {
+            executor.submit(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    for (int i = 0; i < numOfTypesPerKind; i++) {
+                        addType(typeRegistry, new AtlasEnumDef(enumTypePrefix + i));
+                    }
+
+                    for (int i = 0; i < numOfTypesPerKind; i++) {
+                        addType(typeRegistry, new AtlasStructDef(structTypePrefix + i));
+                    }
+
+                    for (int i = 0; i < numOfTypesPerKind; i++) {
+                        addType(typeRegistry, new AtlasClassificationDef(classificationPrefix + i));
+                    }
+
+                    for (int i = 0; i < numOfTypesPerKind; i++) {
+                        addType(typeRegistry, new AtlasEntityDef(entityTypePrefix + i));
+                    }
+
+                    return null;
+                }
+            });
+        }
+
+        executor.shutdown();
+
+        try {
+            boolean isCompleted = executor.awaitTermination(60, TimeUnit.SECONDS);
+
+            assertTrue(isCompleted, "threads did not complete updating types");
+        } catch (InterruptedException excp) {
+            // ignore?
+        }
+
+        // verify that all types added are present in the typeRegistry
+        for (int i = 0; i < numOfTypesPerKind; i++) {
+            String enumType           = enumTypePrefix + i;
+            String structType         = structTypePrefix + i;
+            String classificationType = classificationPrefix + i;
+            String entityType         = entityTypePrefix + i;
+
+            assertNotNull(typeRegistry.getEnumDefByName(enumType), enumType + ": enum not found");
+            assertNotNull(typeRegistry.getStructDefByName(structType), structType + ": struct not found");
+            assertNotNull(typeRegistry.getClassificationDefByName(classificationType), classificationType + ": classification not found");
+            assertNotNull(typeRegistry.getEntityDefByName(entityType), entityType + ": entity not found");
+        }
+    }
+
+    private boolean addType(AtlasTypeRegistry typeRegistry, AtlasBaseTypeDef typeDef) {
+        boolean                    ret = false;
+        AtlasTransientTypeRegistry ttr = null;
+
+        try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate();
+
+            ttr.addType(typeDef);
+
+            ret = true;
+        } catch (AtlasBaseException excp) {
+            // ignore
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, ret);
+        }
+
+        return ret;
+    }
+
     private void validateSuperTypes(AtlasTypeRegistry typeRegistry, String typeName, Set<String> expectedSuperTypes) {
         AtlasType type = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index f7c2931..433d09c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -43,6 +43,7 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
+import org.apache.atlas.util.AtlasRepositoryConfiguration;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.Predicate;
 import org.apache.commons.collections.Transformer;
@@ -65,14 +66,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
     private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class);
 
-    private final AtlasTypeRegistry typeRegistry;
-
+    private final AtlasTypeRegistry          typeRegistry;
     private final Set<TypeDefChangeListener> typeDefChangeListeners;
+    private final int                        typeUpdateLockMaxWaitTimeSeconds;
 
     protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry,
                                      Set<TypeDefChangeListener> typeDefChangeListeners) {
-        this.typeRegistry = typeRegistry;
-        this.typeDefChangeListeners = typeDefChangeListeners;
+        this.typeRegistry                     = typeRegistry;
+        this.typeDefChangeListeners           = typeDefChangeListeners;
+        this.typeUpdateLockMaxWaitTimeSeconds = AtlasRepositoryConfiguration.getTypeUpdateLockMaxWaitTimeInSeconds();
     }
 
     protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry);
@@ -85,16 +87,23 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
     @Override
     public void init() throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr           = null;
+        boolean                    commitUpdates = false;
 
-        AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
-                                                   getStructDefStore(ttr).getAll(),
-                                                   getClassificationDefStore(ttr).getAll(),
-                                                   getEntityDefStore(ttr).getAll());
+        try {
+            ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
 
-        ttr.addTypes(typesDef);
+            AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
+                    getStructDefStore(ttr).getAll(),
+                    getClassificationDefStore(ttr).getAll(),
+                    getEntityDefStore(ttr).getAll());
 
-        typeRegistry.commitTransientTypeRegistry(ttr);
+            ttr.addTypes(typesDef);
+
+            commitUpdates = true;
+        } finally {
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates);
+        }
 
         bootstrapTypes();
     }
@@ -102,7 +111,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @Override
     @GraphTransaction
     public AtlasEnumDef createEnumDef(AtlasEnumDef enumDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addType(enumDef);
 
@@ -110,22 +119,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         ttr.updateGuid(ret.getName(), ret.getGuid());
 
-        updateTypeRegistryPostCommit(ttr);
-
         return ret;
     }
 
     @Override
     @GraphTransaction
     public List<AtlasEnumDef> getAllEnumDefs() throws AtlasBaseException {
-        List<AtlasEnumDef> ret = null;
-
         Collection<AtlasEnumDef> enumDefs = typeRegistry.getAllEnumDefs();
 
-        ret = CollectionUtils.isNotEmpty(enumDefs) ?
-                new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList();
-
-        return ret;
+        return CollectionUtils.isNotEmpty(enumDefs) ? new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList();
     }
 
     @Override
@@ -151,70 +153,53 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @Override
     @GraphTransaction
     public AtlasEnumDef updateEnumDefByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByName(name, enumDef);
 
-        AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getEnumDefStore(ttr).updateByName(name, enumDef);
     }
 
     @Override
     @GraphTransaction
     public AtlasEnumDef updateEnumDefByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByGuid(guid, enumDef);
 
-        AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getEnumDefStore(ttr).updateByGuid(guid, enumDef);
     }
 
     @Override
     @GraphTransaction
     public void deleteEnumDefByName(String name) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasEnumDef byName = typeRegistry.getEnumDefByName(name);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByName(name);
 
         getEnumDefStore(ttr).deleteByName(name);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public void deleteEnumDefByGuid(String guid) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasEnumDef byGuid = typeRegistry.getEnumDefByGuid(guid);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByGuid(guid);
 
         getEnumDefStore(ttr).deleteByGuid(guid);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public AtlasEnumDefs searchEnumDefs(SearchFilter filter) throws AtlasBaseException {
-        AtlasEnumDefs search = getEnumDefStore(typeRegistry).search(filter);
-        return search;
+        return getEnumDefStore(typeRegistry).search(filter);
     }
 
     @Override
     @GraphTransaction
     public AtlasStructDef createStructDef(AtlasStructDef structDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addType(structDef);
 
@@ -222,31 +207,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         ttr.updateGuid(ret.getName(), ret.getGuid());
 
-        updateTypeRegistryPostCommit(ttr);
-
         return ret;
     }
 
     @Override
     @GraphTransaction
     public List<AtlasStructDef> getAllStructDefs() throws AtlasBaseException {
-        List<AtlasStructDef> ret = null;
-
         Collection<AtlasStructDef> structDefs = typeRegistry.getAllStructDefs();
 
-        ret = CollectionUtils.isNotEmpty(structDefs) ?
-                new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList();
-
-        return ret;
+        return CollectionUtils.isNotEmpty(structDefs) ? new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList();
     }
 
     @Override
     @GraphTransaction
     public AtlasStructDef getStructDefByName(String name) throws AtlasBaseException {
         AtlasStructDef ret = typeRegistry.getStructDefByName(name);
+
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
         }
+
         return ret;
     }
 
@@ -254,81 +234,65 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @GraphTransaction
     public AtlasStructDef getStructDefByGuid(String guid) throws AtlasBaseException {
         AtlasStructDef ret = typeRegistry.getStructDefByGuid(guid);
+
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
         }
+
         return ret;
     }
 
     @Override
     @GraphTransaction
     public AtlasStructDef updateStructDefByName(String name, AtlasStructDef structDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByName(name, structDef);
 
-        AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getStructDefStore(ttr).updateByName(name, structDef);
     }
 
     @Override
     @GraphTransaction
     public AtlasStructDef updateStructDefByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByGuid(guid, structDef);
 
-        AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getStructDefStore(ttr).updateByGuid(guid, structDef);
     }
 
     @Override
     @GraphTransaction
     public void deleteStructDefByName(String name) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasStructDef byName = typeRegistry.getStructDefByName(name);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByName(name);
 
         getStructDefStore(ttr).deleteByName(name, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public void deleteStructDefByGuid(String guid) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasStructDef byGuid = typeRegistry.getStructDefByGuid(guid);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByGuid(guid);
 
         getStructDefStore(ttr).deleteByGuid(guid, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public AtlasStructDefs searchStructDefs(SearchFilter filter) throws AtlasBaseException {
-        AtlasStructDefs search = getStructDefStore(typeRegistry).search(filter);
-
-        return search;
+        return getStructDefStore(typeRegistry).search(filter);
     }
 
     @Override
     @GraphTransaction
     public AtlasClassificationDef createClassificationDef(AtlasClassificationDef classificationDef)
         throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addType(classificationDef);
 
@@ -336,22 +300,16 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         ttr.updateGuid(ret.getName(), ret.getGuid());
 
-        updateTypeRegistryPostCommit(ttr);
-
         return ret;
     }
 
     @Override
     @GraphTransaction
     public List<AtlasClassificationDef> getAllClassificationDefs() throws AtlasBaseException {
-        List<AtlasClassificationDef> ret = null;
-
         Collection<AtlasClassificationDef> classificationDefs = typeRegistry.getAllClassificationDefs();
 
-        ret = CollectionUtils.isNotEmpty(classificationDefs) ?
-                new ArrayList<>(classificationDefs) : Collections.<AtlasClassificationDef>emptyList();
-
-        return ret;
+        return CollectionUtils.isNotEmpty(classificationDefs) ? new ArrayList<>(classificationDefs)
+                                                              : Collections.<AtlasClassificationDef>emptyList();
     }
 
     @Override
@@ -362,6 +320,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
         }
+
         return ret;
     }
 
@@ -369,9 +328,11 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @GraphTransaction
     public AtlasClassificationDef getClassificationDefByGuid(String guid) throws AtlasBaseException {
         AtlasClassificationDef ret = typeRegistry.getClassificationDefByGuid(guid);
+
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
         }
+
         return ret;
     }
 
@@ -379,72 +340,54 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @GraphTransaction
     public AtlasClassificationDef updateClassificationDefByName(String name, AtlasClassificationDef classificationDef)
         throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByName(name, classificationDef);
 
-        AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getClassificationDefStore(ttr).updateByName(name, classificationDef);
     }
 
     @Override
     @GraphTransaction
     public AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassificationDef classificationDef)
         throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByGuid(guid, classificationDef);
 
-        AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
     }
 
     @Override
     @GraphTransaction
     public void deleteClassificationDefByName(String name) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasClassificationDef byName = typeRegistry.getClassificationDefByName(name);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByName(name);
 
         getClassificationDefStore(ttr).deleteByName(name, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public void deleteClassificationDefByGuid(String guid) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasClassificationDef byGuid = typeRegistry.getClassificationDefByGuid(guid);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByGuid(guid);
 
         getClassificationDefStore(ttr).deleteByGuid(guid, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public AtlasClassificationDefs searchClassificationDefs(SearchFilter filter) throws AtlasBaseException {
-        AtlasClassificationDefs search = getClassificationDefStore(typeRegistry).search(filter);
-
-        return search;
+        return getClassificationDefStore(typeRegistry).search(filter);
     }
 
     @Override
     @GraphTransaction
     public AtlasEntityDef createEntityDef(AtlasEntityDef entityDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addType(entityDef);
 
@@ -452,31 +395,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         ttr.updateGuid(ret.getName(), ret.getGuid());
 
-        updateTypeRegistryPostCommit(ttr);
-
         return ret;
     }
 
     @Override
     @GraphTransaction
     public List<AtlasEntityDef> getAllEntityDefs() throws AtlasBaseException {
-        List<AtlasEntityDef> ret = null;
-
         Collection<AtlasEntityDef> entityDefs = typeRegistry.getAllEntityDefs();
 
-        ret = CollectionUtils.isNotEmpty(entityDefs) ?
-                new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList();
-
-        return ret;
+        return CollectionUtils.isNotEmpty(entityDefs) ? new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList();
     }
 
     @Override
     @GraphTransaction
     public AtlasEntityDef getEntityDefByName(String name) throws AtlasBaseException {
         AtlasEntityDef ret = typeRegistry.getEntityDefByName(name);
+
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
         }
+
         return ret;
     }
 
@@ -484,74 +422,58 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
     @GraphTransaction
     public AtlasEntityDef getEntityDefByGuid(String guid) throws AtlasBaseException {
         AtlasEntityDef ret = typeRegistry.getEntityDefByGuid(guid);
+
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
         }
+
         return ret;
     }
 
     @Override
     @GraphTransaction
     public AtlasEntityDef updateEntityDefByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByName(name, entityDef);
 
-        AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getEntityDefStore(ttr).updateByName(name, entityDef);
     }
 
     @Override
     @GraphTransaction
     public AtlasEntityDef updateEntityDefByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypeByGuid(guid, entityDef);
 
-        AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef);
-
-        updateTypeRegistryPostCommit(ttr);
-
-        return ret;
+        return getEntityDefStore(ttr).updateByGuid(guid, entityDef);
     }
 
     @Override
     @GraphTransaction
     public void deleteEntityDefByName(String name) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasEntityDef byName = typeRegistry.getEntityDefByName(name);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByName(name);
 
         getEntityDefStore(ttr).deleteByName(name, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public void deleteEntityDefByGuid(String guid) throws AtlasBaseException {
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
-        AtlasEntityDef byGuid = typeRegistry.getEntityDefByGuid(guid);
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.removeTypeByGuid(guid);
 
         getEntityDefStore(ttr).deleteByGuid(guid, null);
-
-        updateTypeRegistryPostCommit(ttr);
     }
 
     @Override
     @GraphTransaction
     public AtlasEntityDefs searchEntityDefs(SearchFilter filter) throws AtlasBaseException {
-        AtlasEntityDefs search = getEntityDefStore(typeRegistry).search(filter);
-
-        return search;
+        return getEntityDefStore(typeRegistry).search(filter);
     }
 
     @Override
@@ -567,7 +489,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         AtlasTypesDef ret = new AtlasTypesDef();
 
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addTypes(typesDef);
 
@@ -644,8 +566,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
             }
         }
 
-        updateTypeRegistryPostCommit(ttr);
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
                     CollectionUtils.size(typesDef.getEnumDefs()),
@@ -670,7 +590,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
 
         AtlasTypesDef ret = new AtlasTypesDef();
 
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.updateTypes(typesDef);
 
@@ -703,8 +623,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
             }
         }
 
-        updateTypeRegistryPostCommit(ttr);
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
                     CollectionUtils.size(typesDef.getEnumDefs()),
@@ -728,7 +646,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
                     CollectionUtils.size(typesDef.getEntityDefs()));
         }
 
-        AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
 
         ttr.addTypes(typesDef);
 
@@ -817,8 +735,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
             }
         }
 
-        updateTypeRegistryPostCommit(ttr);
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
                     CollectionUtils.size(typesDef.getEnumDefs()),
@@ -934,8 +850,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
         storeInitializer.initializeStore(this, typeRegistry, typesDirName);
     }
 
-    private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) {
+    private AtlasTransientTypeRegistry lockTypeRegistryAndReleasePostCommit() throws AtlasBaseException {
+        AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
+
         new TypeRegistryUpdateHook(ttr);
+
+        return ttr;
     }
 
     private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
@@ -953,9 +873,9 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
                 LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
             }
 
-            if (isSuccess) {
-                typeRegistry.commitTransientTypeRegistry(ttr);
+            typeRegistry.releaseTypeRegistryForUpdate(ttr, isSuccess);
 
+            if (isSuccess) {
                 notifyListeners(ttr);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
index 287ef09..88197ac 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
@@ -68,7 +68,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
                                     Set<TypeDefChangeListener> typeDefChangeListeners) {
         super(typeRegistry, typeDefChangeListeners);
 
-        LOG.info("==> AtlasTypeDefGraphStoreV1()");
+        LOG.debug("==> AtlasTypeDefGraphStoreV1()");
 
         try {
             init();
@@ -76,7 +76,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
             LOG.error("failed to initialize types from graph store", excp);
         }
 
-        LOG.info("<== AtlasTypeDefGraphStoreV1()");
+        LOG.debug("<== AtlasTypeDefGraphStoreV1()");
     }
 
     @Override
@@ -101,11 +101,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
 
     @Override
     public void init() throws AtlasBaseException {
-        LOG.info("==> AtlasTypeDefGraphStoreV1.init()");
+        LOG.debug("==> AtlasTypeDefGraphStoreV1.init()");
 
         super.init();
 
-        LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
+        LOG.debug("<== AtlasTypeDefGraphStoreV1.init()");
     }
 
     AtlasGraph getAtlasGraph() { return atlasGraph; }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
index 6655085..71c7ff8 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
@@ -48,6 +48,10 @@ public class AtlasRepositoryConfiguration {
     private static List<String> skippedOperations = null;
     public static final String SEPARATOR = ":";
 
+    private static final String  CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS  = "atlas.server.type.update.lock.max.wait.time.seconds";
+    private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15);
+    private static Integer typeUpdateLockMaxWaitTimeInSeconds = null;
+
     @SuppressWarnings("unchecked")
     public static Class<? extends TypeCache> getTypeCache() {
         // Get the type cache implementation class from Atlas configuration.
@@ -155,4 +159,21 @@ public class AtlasRepositoryConfiguration {
         skippedOperations = null;
     }
 
+    public static int getTypeUpdateLockMaxWaitTimeInSeconds() {
+        Integer ret = typeUpdateLockMaxWaitTimeInSeconds;
+
+        if (ret == null) {
+            try {
+                Configuration config = ApplicationProperties.get();
+
+                ret = config.getInteger(CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS, DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS);
+
+                typeUpdateLockMaxWaitTimeInSeconds = ret;
+            } catch (AtlasException e) {
+                // ignore
+            }
+        }
+
+        return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret;
+    }
 }


Mime
View raw message