atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-1665: export optimization to reduce file-size and export-time
Date Sun, 19 Mar 2017 18:14:33 GMT
ATLAS-1665: export optimization to reduce file-size and export-time

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
(cherry picked from commit 160b28740b0689d7892299fd246394508c28d2dd)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/e8d1d523
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/e8d1d523
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/e8d1d523

Branch: refs/heads/0.8-incubating
Commit: e8d1d523a864ff6f340158c5c710cd64b5b071a9
Parents: 4376678
Author: ashutoshm <amestry@hortonworks.com>
Authored: Thu Mar 16 22:03:42 2017 -0700
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Sat Mar 18 19:40:43 2017 -0700

----------------------------------------------------------------------
 .../atlas/model/instance/AtlasEntity.java       |   2 +-
 .../graph/v1/AtlasEntityGraphDiscoveryV1.java   |  24 +-
 .../store/graph/v1/AtlasEntityStoreV1.java      |   7 +-
 .../store/graph/v1/AtlasEntityStream.java       |  14 +-
 .../graph/v1/AtlasEntityStreamForImport.java    |  26 ++-
 .../store/graph/v1/EntityImportStream.java      |   4 +
 .../repository/store/graph/v1/EntityStream.java |   1 -
 .../store/graph/v1/InMemoryMapEntityStream.java |   2 -
 .../atlas/util/AtlasGremlin2QueryProvider.java  |   6 +-
 .../atlas/web/resources/ExportService.java      | 234 ++++++++++++-------
 .../org/apache/atlas/web/resources/ZipSink.java |   5 +
 .../apache/atlas/web/resources/ZipSource.java   |  42 ++--
 12 files changed, 236 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 4e3895d..0e277b1 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -196,6 +196,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         }
 
         sb.append("AtlasEntity{");
+        super.toString(sb);
         sb.append("guid='").append(guid).append('\'');
         sb.append(", status=").append(status);
         sb.append(", createdBy='").append(createdBy).append('\'');
@@ -207,7 +208,6 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         AtlasBaseTypeDef.dumpObjects(classifications, sb);
         sb.append(']');
         sb.append(", ");
-        super.toString(sb);
         sb.append('}');
 
         return sb;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 6c88510..12e8bb1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -17,14 +17,6 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
@@ -34,12 +26,26 @@ import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.EntityResolver;
-import org.apache.atlas.type.*;
+import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 
 public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index cce3fca..c8ac3b6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -159,13 +159,14 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         int         progressReportedAtCount = 0;
 
         while (entityStream.hasNext()) {
-            AtlasEntity entity = entityStream.next();
+            AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
+            AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity()
: null;
 
             if(entity == null || processedGuids.contains(entity.getGuid())) {
                 continue;
             }
 
-            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity,
entityStream);
+            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo,
entityStream);
 
             EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
 
@@ -177,7 +178,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids,
importResult);
             updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids,
importResult);
 
-            if ((processedGuids.size() - progressReportedAtCount) > 10) {
+            if ((processedGuids.size() - progressReportedAtCount) > 1000) {
                 progressReportedAtCount = processedGuids.size();
 
                 LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
index 5d9a7d4..eb860ff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
@@ -24,9 +24,9 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import java.util.Iterator;
 
 public class AtlasEntityStream implements EntityStream {
-    private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
-    private final EntityStream             entityStream;
-    private Iterator<AtlasEntity>         iterator;
+    protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
+    protected final EntityStream             entityStream;
+    private         Iterator<AtlasEntity>    iterator;
 
 
     public AtlasEntityStream(AtlasEntity entity) {
@@ -49,6 +49,12 @@ public class AtlasEntityStream implements EntityStream {
         this.entityStream        = entityStream;
     }
 
+    public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream)
{
+        this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo);
+        this.iterator            = this.entitiesWithExtInfo.getEntities().iterator();
+        this.entityStream        = entityStream;
+    }
+
     @Override
     public boolean hasNext() {
         return iterator.hasNext();
@@ -66,7 +72,7 @@ public class AtlasEntityStream implements EntityStream {
 
     @Override
     public AtlasEntity getByGuid(String guid) {
-        return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
+        return entityStream != null ?  entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 8cb36ac..69140e6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -18,17 +18,29 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-
-import java.util.List;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 
 public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream
{
-    public AtlasEntityStreamForImport(AtlasEntity entity) {
-        super(entity);
+    public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream
entityStream) {
+        super(entityWithExtInfo, entityStream);
+    }
+
+    @Override
+    public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        AtlasEntity entity = next();
+
+        return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo)
: null;
     }
 
-    public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
-        super(entity, entityStream);
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
+
+        if(ent == null && entityStream != null) {
+            return entityStream.getByGuid(guid);
+        }
+
+        return ent;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index 73994b9..0f711db 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -18,7 +18,11 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+
 public interface EntityImportStream extends EntityStream {
 
+    AtlasEntityWithExtInfo getNextEntityWithExtInfo();
+
     void onImportComplete(String guid);
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
index 4c43921..3444bfd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
 
 public interface EntityStream {
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
index 241f6d0..68d7f11 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
@@ -19,9 +19,7 @@ package org.apache.atlas.repository.store.graph.v1;
 
 
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index 4743b73..d3413c2 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -38,11 +38,11 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider
{
             case ENTITIES_FOR_TAG_METRIC:
                 return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.getProperty('__type.category').name()
== 'TRAIT'}.'__type.name'.toSet()).groupCount{it.getProperty('__typeName')}.cap.toList()";
             case EXPORT_BY_GUID_FULL:
-                return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
+                return "g.V('__guid', startGuid).bothE().bothV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames
!= null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
             case EXPORT_BY_GUID_CONNECTED_IN_EDGE:
-                return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()";
+                return "g.V('__guid', startGuid).inE().outV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames
!= null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
             case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
-                return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()";
+                return "g.V('__guid', startGuid).outE().inV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames
!= null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
             case EXPORT_TYPE_STARTS_WITH:
                 return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).startsWith(attrValue)}).has('__guid').__guid.toList()";
             case EXPORT_TYPE_ENDS_WITH:

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index e123ff7..54faee0 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -25,6 +25,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
@@ -55,14 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
 
 public class ExportService {
     private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
@@ -119,18 +113,22 @@ public class ExportService {
         }
 
         try {
-            List<AtlasEntity> entities = getStartingEntity(item, context);
+            List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context);
 
-            for (AtlasEntity entity: entities) {
-                processEntity(entity, context, TraversalDirection.UNKNOWN);
+            for (AtlasEntityWithExtInfo entityWithExtInfo : entities) {
+                processEntity(entityWithExtInfo.getEntity().getGuid(), context);
             }
 
-            while (!context.guidsToProcessIsEmpty()) {
-                String             guid      = context.guidsToProcessRemove(0);
-                TraversalDirection direction = context.guidDirection.get(guid);
-                AtlasEntity        entity    = entityGraphRetriever.toAtlasEntity(guid);
+            while (!context.guidsToProcess.isEmpty()) {
+                while (!context.guidsToProcess.isEmpty()) {
+                    String guid = context.guidsToProcess.remove(0);
+                    processEntity(guid, context);
+                }
 
-                processEntity(entity, context, direction);
+                if (!context.guidsLineageToProcess.isEmpty()) {
+                    context.guidsToProcess.addAll(context.guidsLineageToProcess);
+                    context.guidsLineageToProcess.clear();
+                }
             }
         } catch (AtlasBaseException excp) {
             context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
@@ -143,11 +141,11 @@ public class ExportService {
         }
     }
 
-    private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context)
throws AtlasBaseException {
-        List<AtlasEntity> ret = new ArrayList<>();
+    private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext
context) throws AtlasBaseException {
+        List<AtlasEntityWithExtInfo> ret = new ArrayList<>();
 
         if (StringUtils.isNotEmpty(item.getGuid())) {
-            AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+            AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item);
 
             if (entity != null) {
                 ret = Collections.singletonList(entity);
@@ -188,17 +186,17 @@ public class ExportService {
                 context.bindings.put("attrName", attribute.getQualifiedName());
                 context.bindings.put("attrValue", attrValue);
 
-                List<String> guids = executeGremlinQuery(queryTemplate, context);
+                List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
 
                 if (CollectionUtils.isNotEmpty(guids)) {
                     for (String guid : guids) {
-                        AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
+                        AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-                        if (entity == null) {
+                        if (entityWithExtInfo == null) {
                             continue;
                         }
 
-                        ret.add(entity);
+                        ret.add(entityWithExtInfo);
                     }
                 }
 
@@ -211,24 +209,37 @@ public class ExportService {
         return ret;
     }
 
-    private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection
direction) throws AtlasBaseException {
+    private void processEntity(String guid, ExportContext context) throws AtlasBaseException
{
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
+            LOG.debug("==> processEntity({})", guid);
         }
 
-        if (!context.guidsProcessed.contains(entity.getGuid())) {
-            context.guidsProcessed.add(entity.getGuid());
-            context.result.getData().getEntityCreationOrder().add(entity.getGuid());
+        if (!context.guidsProcessed.contains(guid)) {
+            TraversalDirection      direction         = context.guidDirection.get(guid);
+            AtlasEntityWithExtInfo  entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+            context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+
+            addEntity(entityWithExtInfo, context);
+            addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context);
+            addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context);
 
-            addTypesAsNeeded(entity.getTypeName(), context);
-            addClassificationsAsNeeded(entity, context);
-            addEntity(entity, context);
+            context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
+            getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
 
-            getConntedEntitiesBasedOnOption(entity, context, direction);
+            if(entityWithExtInfo.getReferredEntities() != null) {
+                for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+                    addTypesAsNeeded(e.getTypeName(), context);
+                    addClassificationsAsNeeded(e, context);
+                    getConntedEntitiesBasedOnOption(e, context, direction);
+                }
+
+                context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+            }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
+            LOG.debug("<== processEntity({})", guid);
         }
     }
 
@@ -245,7 +256,7 @@ public class ExportService {
     }
 
     private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context,
TraversalDirection direction) throws AtlasBaseException {
-        if (direction == TraversalDirection.UNKNOWN) {
+        if (direction == null || direction == TraversalDirection.UNKNOWN) {
             getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
         } else {
             if (isProcessEntity(entity)) {
@@ -272,41 +283,35 @@ public class ExportService {
             String query = getQueryForTraversalDirection(direction);
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}",
AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
+                LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}",
AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
             }
 
             context.bindings.clear();
             context.bindings.put("startGuid", entity.getGuid());
 
-            List<String> guids = executeGremlinQuery(query, context);
+            List<HashMap<String, Object>> result = executeGremlinQuery(query,
context);
 
-            if (CollectionUtils.isEmpty(guids)) {
+            if (CollectionUtils.isEmpty(result)) {
                 continue;
             }
 
-            for (String guid : guids) {
+            for (HashMap<String, Object> hashMap : result) {
+                String             guid             = (String) hashMap.get("__guid");
                 TraversalDirection currentDirection = context.guidDirection.get(guid);
+                boolean            isLineage        = (boolean) hashMap.get("isProcess");
 
                 if (currentDirection == null) {
-                    context.guidDirection.put(guid, direction);
+                    context.addToBeProcessed(isLineage, guid, direction);
 
-                    if (!context.guidsToProcessContains(guid)) {
-                        context.guidsToProcessAdd(guid);
-                    }
                 } else if (currentDirection == TraversalDirection.OUTWARD && direction
== TraversalDirection.INWARD) {
-                    context.guidDirection.put(guid, direction);
-
                     // the entity should be reprocessed to get inward entities
                     context.guidsProcessed.remove(guid);
-
-                    if (!context.guidsToProcessContains(guid)) {
-                        context.guidsToProcessAdd(guid);
-                    }
+                    context.addToBeProcessed(isLineage, guid, direction);
                 }
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess
{}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
+                LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess
{}", entity.getGuid(), result.size(), context.guidsToProcess.size());
             }
         }
     }
@@ -324,7 +329,7 @@ public class ExportService {
 
     private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity),
context.guidsToProcessSize());
+            LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity),
context.guidsToProcess.size());
         }
 
         String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
@@ -332,36 +337,38 @@ public class ExportService {
         context.bindings.clear();
         context.bindings.put("startGuid", entity.getGuid());
 
-        List<String> result = executeGremlinQuery(query, context);
+        List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
 
-        if (result == null) {
+        if (CollectionUtils.isEmpty(result)) {
             return;
         }
 
-        for (String guid : result) {
-            if (!context.guidsProcessed.contains(guid)) {
-                if (!context.guidsToProcessContains(guid)) {
-                    context.guidsToProcessAdd(guid);
-                }
+        for (HashMap<String, Object> hashMap : result) {
+            String  guid      = (String) hashMap.get("__guid");
+            boolean isLineage = (boolean) hashMap.get("isProcess");
 
-                context.guidDirection.put(guid, TraversalDirection.BOTH);
+            if (!context.guidsProcessed.contains(guid)) {
+                context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess
{}", entity.getGuid(), result.size(), context.guidsToProcessSize());
+            LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess
{}", entity.getGuid(), result.size(), context.guidsToProcess.size());
         }
     }
 
-    private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException
{
+    private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException
{
         context.sink.add(entity);
 
-        context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName()));
-        context.result.incrementMeticsCounter("entities");
-
-        if (context.guidsProcessed.size() % 10 == 0) {
-            LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size());
+        context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
+        if(entity.getReferredEntities() != null) {
+            for (AtlasEntity e: entity.getReferredEntities().values()) {
+                context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
+            }
         }
+
+        context.result.incrementMeticsCounter("entity:withExtInfo");
+        context.reportProgress();
     }
 
     private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) {
@@ -394,15 +401,23 @@ public class ExportService {
         }
     }
 
-    private List<String> executeGremlinQuery(String query, ExportContext context) {
+    private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext
context) {
         try {
-            return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine,
context.bindings, query, false);
+            return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine,
context.bindings, query, false);
         } catch (ScriptException e) {
             LOG.error("Script execution failed for query: ", query, e);
             return null;
         }
     }
 
+    private List<String> executeGremlinQueryForGuids(String query, ExportContext context)
{
+        try {
+            return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine,
context.bindings, query, false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
+        }
+    }
 
     private enum TraversalDirection {
         UNKNOWN,
@@ -432,11 +447,57 @@ public class ExportService {
         }
     }
 
+    private class UniqueList<T> {
+        private final List<T>   list = new ArrayList<>();
+        private final Set<T>    set = new HashSet<>();
+
+        public void add(T e) {
+            if(set.contains(e)) {
+                return;
+            }
+
+            list.add(e);
+            set.add(e);
+        }
+
+        public void addAll(UniqueList<T> uniqueList) {
+            for (T item : uniqueList.list) {
+                if(set.contains(item)) continue;
+
+                set.add(item);
+                list.add(item);
+            }
+        }
+
+        public T remove(int index) {
+            T e = list.remove(index);
+            set.remove(e);
+            return e;
+        }
+
+        public boolean contains(T e) {
+            return set.contains(e);
+        }
+
+        public int size() {
+            return list.size();
+        }
+
+        public boolean isEmpty() {
+            return list.isEmpty();
+        }
+
+        public void clear() {
+            list.clear();
+            set.clear();
+        }
+    }
+
 
     private class ExportContext {
         final Set<String>                     guidsProcessed = new HashSet<>();
-        private final List<String>            guidsToProcessList = new ArrayList<>();
-        private final Set<String>             guidsToProcessSet = new HashSet<>();
+        final UniqueList<String>              guidsToProcess = new UniqueList<>();
+        final UniqueList<String>              guidsLineageToProcess = new UniqueList<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
         final AtlasExportResult               result;
         final ZipSink                         sink;
@@ -446,6 +507,8 @@ public class ExportService {
         private final ExportFetchType     fetchType;
         private final String              matchType;
 
+        private       int                 progressReportCount = 0;
+
         ExportContext(AtlasExportResult result, ZipSink sink) {
             this.result = result;
             this.sink   = sink;
@@ -481,33 +544,30 @@ public class ExportService {
         }
 
         public void clear() {
-            guidsToProcessList.clear();
-            guidsToProcessSet.clear();
+            guidsToProcess.clear();
             guidsProcessed.clear();
             guidDirection.clear();
         }
 
-        public boolean guidsToProcessIsEmpty() {
-            return this.guidsToProcessList.isEmpty();
-        }
+        public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection
direction) {
+            if(!isSuperTypeProcess) {
+                guidsToProcess.add(guid);
+            }
 
-        public String guidsToProcessRemove(int i) {
-            String s = this.guidsToProcessList.remove(i);
-            guidsToProcessSet.remove(s);
-            return s;
-        }
+            if(isSuperTypeProcess) {
+                guidsLineageToProcess.add(guid);
+            }
 
-        public int guidsToProcessSize() {
-            return this.guidsToProcessList.size();
+            guidDirection.put(guid, direction);
         }
 
-        public boolean guidsToProcessContains(String guid) {
-            return guidsToProcessSet.contains(guid);
-        }
+        public void reportProgress() {
+
+            if ((guidsProcessed.size() - progressReportCount) > 1000) {
+                progressReportCount = guidsProcessed.size();
 
-        public void guidsToProcessAdd(String guid) {
-            this.guidsToProcessList.add(guid);
-            guidsToProcessSet.add(guid);
+                LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size());
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
index 37d9eb5..c197d41 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
@@ -45,6 +45,11 @@ public class ZipSink {
         saveToZip(entity.getGuid(), jsonData);
     }
 
+    public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException
{
+        String jsonData = convertToJSON(entityWithExtInfo);
+        saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
+    }
+
     public void setResult(AtlasExportResult result) throws AtlasBaseException {
         String jsonData = convertToJSON(result);
         saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e8d1d523/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
index a69f7fa..661542f 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
@@ -17,17 +17,19 @@
  */
 package org.apache.atlas.web.resources;
 
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-import org.codehaus.jackson.type.TypeReference;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -57,7 +59,7 @@ public class ZipSource implements EntityImportStream {
     public AtlasTypesDef getTypesDef() throws AtlasBaseException {
         final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
 
-        String s = getFromCache(fileName);
+        String s = (String) getFromCache(fileName);
         return convertFromJson(AtlasTypesDef.class, s);
     }
 
@@ -104,9 +106,10 @@ public class ZipSource implements EntityImportStream {
         return this.creationOrder;
     }
 
-    public AtlasEntity getEntity(String guid) throws AtlasBaseException {
-        String s = getFromCache(guid);
-        return convertFromJson(AtlasEntity.class, s);
+    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException
{
+        String s = (String) getFromCache(guid);
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class,
s);
+        return entityWithExtInfo;
     }
 
     private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException
{
@@ -136,9 +139,7 @@ public class ZipSource implements EntityImportStream {
     }
 
     private String getFromCache(String entryName) {
-        if(!guidEntityJsonMap.containsKey(entryName)) return "";
-
-        return guidEntityJsonMap.get(entryName).toString();
+        return guidEntityJsonMap.get(entryName);
     }
 
     public void close() {
@@ -158,8 +159,15 @@ public class ZipSource implements EntityImportStream {
 
     @Override
     public AtlasEntity next() {
+        AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+    }
+
+    @Override
+    public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
         try {
-            return getEntity(this.iterator.next());
+            return getEntityWithExtInfo(this.iterator.next());
         } catch (AtlasBaseException e) {
             e.printStackTrace();
             return null;
@@ -186,10 +194,16 @@ public class ZipSource implements EntityImportStream {
         }
     }
 
+    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+        if(guidEntityJsonMap.containsKey(guid)) {
+            return getEntityWithExtInfo(guid).getEntity();
+        }
+
+        return null;
+    }
+
     @Override
     public void onImportComplete(String guid) {
-        if(guid != null) {
-            guidEntityJsonMap.remove(guid);
-        }
+        guidEntityJsonMap.remove(guid);
     }
 }



Mime
View raw message