atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [24/39] atlas git commit: ATLAS-1948: export fix to correct the import order
Date Mon, 24 Jul 2017 15:57:29 GMT
ATLAS-1948: export fix to correct the import order

Signed-off-by: Madhan Neethiraj <madhan@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/24a106b4
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/24a106b4
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/24a106b4

Branch: refs/heads/feature-odf
Commit: 24a106b4e1a1581b266a453cc6c920bcd06d7a9b
Parents: cfb6b84
Author: ashutoshm <amestry@hortonworks.com>
Authored: Tue Jul 18 17:08:34 2017 -0700
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Wed Jul 19 00:50:14 2017 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ExportService.java  |  22 ++--
 .../atlas/repository/impexp/ZipSource.java      |   2 +-
 .../store/graph/v1/AtlasEntityStoreV1.java      | 119 ++++++++++++++-----
 .../repository/impexp/ImportServiceTest.java    |  24 +++-
 repository/src/test/resources/ctas.zip          | Bin 0 -> 7674 bytes
 5 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8f45e9f..de48573 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -98,7 +98,6 @@ public class ExportService {
             AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
 
             processTypesDef(context);
-
             updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime));
         } catch(Exception ex) {
             LOG.error("Operation failed: ", ex);
@@ -113,6 +112,7 @@ public class ExportService {
     }
 
     private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[]
statuses, int duration) throws AtlasBaseException {
+        context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
         context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
         context.sink.setTypesDef(context.result.getData().getTypesDef());
         clearContextData(context);
@@ -201,9 +201,10 @@ public class ExportService {
                     processEntity(guid, context);
                 }
 
-                if (!context.guidsLineageToProcess.isEmpty()) {
-                    context.guidsToProcess.addAll(context.guidsLineageToProcess);
-                    context.guidsLineageToProcess.clear();
+                if (!context.lineageToProcess.isEmpty()) {
+                    context.guidsToProcess.addAll(context.lineageToProcess);
+                    context.lineageProcessed.addAll(context.lineageToProcess.getList());
+                    context.lineageToProcess.clear();
                 }
             }
         } catch (AtlasBaseException excp) {
@@ -295,7 +296,9 @@ public class ExportService {
             TraversalDirection      direction         = context.guidDirection.get(guid);
             AtlasEntityWithExtInfo  entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-            context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+            if(!context.lineageProcessed.contains(guid)) {
+                context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+            }
 
             addEntity(entityWithExtInfo, context);
             addTypes(entityWithExtInfo.getEntity(), context);
@@ -651,13 +654,18 @@ public class ExportService {
             list.clear();
             set.clear();
         }
+
+        public List<T> getList() {
+            return list;
+        }
     }
 
 
     private class ExportContext {
         final Set<String>                     guidsProcessed = new HashSet<>();
         final UniqueList<String>              guidsToProcess = new UniqueList<>();
-        final UniqueList<String>              guidsLineageToProcess = new UniqueList<>();
+        final UniqueList<String>              lineageToProcess = new UniqueList<>();
+        final Set<String>                     lineageProcessed = new HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
         final Set<String>                     entityTypes         = new HashSet<>();
         final Set<String>                     classificationTypes = new HashSet<>();
@@ -719,7 +727,7 @@ public class ExportService {
             }
 
             if(isSuperTypeProcess) {
-                guidsLineageToProcess.add(guid);
+                lineageToProcess.add(guid);
             }
 
             guidDirection.put(guid, direction);

http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index edb816f..4c23582 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -196,7 +196,7 @@ public class ZipSource implements EntityImportStream {
             AtlasEntity entity = getEntity(guid);
             return entity;
         } catch (AtlasBaseException e) {
-            e.printStackTrace();
+            LOG.error("getByGuid: {} failed!", guid, e);
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 5ea4ff2..f340330 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,7 +69,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
     private final DeleteHandlerV1           deleteHandler;
     private final AtlasTypeRegistry         typeRegistry;
     private final AtlasEntityChangeNotifier entityChangeNotifier;
-    private final EntityGraphMapper entityGraphMapper;
+    private final EntityGraphMapper         entityGraphMapper;
 
     @Inject
     public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry,
@@ -77,7 +77,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         this.deleteHandler        = deleteHandler;
         this.typeRegistry         = typeRegistry;
         this.entityChangeNotifier = entityChangeNotifier;
-        this.entityGraphMapper = entityGraphMapper;
+        this.entityGraphMapper    = entityGraphMapper;
     }
 
     @Override
@@ -123,7 +123,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
     @Override
     @GraphTransaction
     public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String,
Object> uniqAttributes)
-                                                                                        
   throws AtlasBaseException {
+            throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
         }
@@ -136,7 +136,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
         if (ret == null) {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND,
entityType.getTypeName(),
-                uniqAttributes.toString());
+                    uniqAttributes.toString());
         }
 
         if (LOG.isDebugEnabled()) {
@@ -160,29 +160,36 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         EntityMutationResponse ret = new EntityMutationResponse();
         ret.setGuidAssignments(new HashMap<String, String>());
 
-        Set<String> processedGuids          = new HashSet<>();
-        int         streamSize              = entityStream.size();
-        float       currentPercent          = 0f;
+        Set<String> processedGuids = new HashSet<>();
+        float       currentPercent = 0f;
 
-        while (entityStream.hasNext()) {
-            AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
+        List<String> residualList = new ArrayList<>();
+        EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream,
residualList);
+        while (entityImportStreamWithResidualList.hasNext()) {
+            AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
             AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity()
: null;
 
-            if(entity == null || processedGuids.contains(entity.getGuid())) {
+            if (entity == null || processedGuids.contains(entity.getGuid())) {
                 continue;
             }
 
             AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo,
entityStream);
+            try {
+                EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
 
-            EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
-            currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
-                    entityStream.getPosition(), streamSize, currentPercent);
+                if (resp.getGuidAssignments() != null) {
+                    ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+                }
 
-            if (resp.getGuidAssignments() != null) {
-                ret.getGuidAssignments().putAll(resp.getGuidAssignments());
-            }
+                currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult,
processedGuids, entityStream.getPosition(),
+                                                     entityImportStreamWithResidualList.getStreamSize(),
currentPercent);
 
-            entityStream.onImportComplete(entity.getGuid());
+                entityStream.onImportComplete(entity.getGuid());
+            } catch (AtlasBaseException e) {
+                if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid()))
{
+                    throw e;
+                }
+            }
         }
 
         importResult.getProcessedEntities().addAll(processedGuids);
@@ -191,20 +198,28 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         return ret;
     }
 
+    private boolean updateResidualList(AtlasBaseException e, List<String> lineageList,
String guid) {
+        if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode()))
{
+            return false;
+        }
+
+        lineageList.add(guid);
+        return true;
+    }
+
     private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
                                       EntityMutationResponse resp,
                                       AtlasImportResult importResult,
                                       Set<String> processedGuids,
                                       int currentIndex, int streamSize, float currentPercent)
{
-
         updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids,
importResult);
         updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids,
importResult);
         updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids,
importResult);
 
         String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
-                                            currentEntity.getEntity().getTypeName(),
-                                            currentIndex,
-                                            currentEntity.getEntity().getGuid());
+                currentEntity.getEntity().getTypeName(),
+                currentIndex,
+                currentEntity.getEntity().getGuid());
 
         return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
     }
@@ -214,10 +229,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         final double tolerance = 0.000001;
         final int MAX_PERCENT = 100;
 
-        float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize);
+        float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
         boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
         float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
-                                ((updateLog) ? ++currentPercent : currentPercent);
+                ((updateLog) ? ++currentPercent : currentPercent);
 
         if (updateLog) {
             log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent),
streamSize, additionalInfo);
@@ -232,7 +247,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         }
 
         for (AtlasEntityHeader h : list) {
-            if(processedGuids.contains(h.getGuid())) {
+            if (processedGuids.contains(h.getGuid())) {
                 continue;
             }
 
@@ -298,7 +313,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
     @Override
     @GraphTransaction
     public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName,
Object attrValue)
-                                                              throws AtlasBaseException {
+            throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue);
         }
@@ -490,8 +505,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         List<AtlasClassification> updatedClassifications = new ArrayList<>();
 
         for (AtlasClassification newClassification : newClassifications) {
-            String               classificationName = newClassification.getTypeName();
-            AtlasClassification  oldClassification  = getClassification(guid, classificationName);
+            String              classificationName = newClassification.getTypeName();
+            AtlasClassification oldClassification  = getClassification(guid, classificationName);
 
             if (oldClassification == null) {
                 throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
@@ -704,7 +719,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
     /**
      * Validate if classification is not already associated with the entities
-     * @param guid unique entity id
+     *
+     * @param guid            unique entity id
      * @param classifications list of classifications to be associated
      */
     private void validateEntityAssociations(String guid, List<AtlasClassification>
classifications) throws AtlasBaseException {
@@ -715,7 +731,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
             if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification))
{
                 throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity:
" + guid +
-                                             ", already associated with classification: "
+ newClassification);
+                        ", already associated with classification: " + newClassification);
             }
         }
     }
@@ -734,4 +750,43 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
         return ret;
     }
+
+    private static class EntityImportStreamWithResidualList {
+        private final EntityImportStream stream;
+        private final List<String>       residualList;
+        private       boolean            navigateResidualList;
+        private       int                currentResidualListIndex;
+
+
+        public EntityImportStreamWithResidualList(EntityImportStream stream, List<String>
residualList) {
+            this.stream                   = stream;
+            this.residualList             = residualList;
+            this.navigateResidualList     = false;
+            this.currentResidualListIndex = 0;
+        }
+
+        public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+            if (navigateResidualList == false) {
+                return stream.getNextEntityWithExtInfo();
+            } else {
+                stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
+                return stream.getNextEntityWithExtInfo();
+            }
+        }
+
+        public boolean hasNext() {
+            if (!navigateResidualList) {
+                boolean streamHasNext = stream.hasNext();
+                navigateResidualList = (streamHasNext == false);
+                return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
+            } else {
+                return (currentResidualListIndex < residualList.size());
+            }
+        }
+
+        public int getStreamSize() {
+            return stream.size() + residualList.size();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index de8e7ef..404225c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -133,4 +133,22 @@ public class ImportServiceTest {
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(),
8);
     }
+
+    @DataProvider(name = "ctas")
+    public static Object[][] getDataFromCtas(ITestContext context) throws IOException {
+        return getZipSource("ctas.zip");
+    }
+
+    @Test(dataProvider = "ctas")
+    public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+        loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
+        loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
+
+        AtlasImportRequest request = getDefaultImportRequest();
+        runImportWithParameters(getImportService(), getDefaultImportRequest(), zipSource);
+    }
+
+    private ImportService getImportService() {
+        return new ImportService(typeDefStore, entityStore, typeRegistry);
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/test/resources/ctas.zip
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/ctas.zip b/repository/src/test/resources/ctas.zip
new file mode 100644
index 0000000..a77966c
Binary files /dev/null and b/repository/src/test/resources/ctas.zip differ


Mime
View raw message