atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [3/5] incubator-atlas git commit: ATLAS-1544: implementation of REST endpoints for entity create/update/bulk-get
Date Mon, 13 Feb 2017 07:43:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index a0096c1..06ceaf2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import com.google.inject.Inject;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
@@ -27,88 +28,619 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
-import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
-public class EntityGraphMapper implements InstanceGraphMapper<AtlasEdge> {
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 
-    private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
 
-    protected final GraphHelper graphHelper = GraphHelper.getInstance();
+public class EntityGraphMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
 
-    protected EntityMutationContext context;
+    private final AtlasGraph        graph       = AtlasGraphProvider.getGraphInstance();
+    private final GraphHelper       graphHelper = GraphHelper.getInstance();
+    private final DeleteHandlerV1   deleteHandler;
+    private final AtlasTypeRegistry typeRegistry;
 
-    protected final StructVertexMapper structVertexMapper;
 
     @Inject
-    public EntityGraphMapper(ArrayVertexMapper arrayVertexMapper, MapVertexMapper mapVertexMapper, DeleteHandlerV1 deleteHandler) {
-        this.structVertexMapper = new StructVertexMapper(arrayVertexMapper, mapVertexMapper, deleteHandler);
-        arrayVertexMapper.init(structVertexMapper);
-        mapVertexMapper.init(structVertexMapper);
+    public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) {
+        this.deleteHandler = deleteHandler;
+        this.typeRegistry  = typeRegistry;
     }
 
-    public AtlasVertex createVertexTemplate(final AtlasStruct instance, final AtlasStructType structType) {
-        AtlasVertex vertex = structVertexMapper.createVertexTemplate(instance, structType);
-        
-        AtlasEntityType entityType = (AtlasEntityType) structType;
-        AtlasEntity entity = (AtlasEntity) instance;
+    public AtlasVertex createVertex(AtlasEntity entity) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createVertex({})", entity.getTypeName());
+        }
+
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        AtlasVertex ret = createStructVertex(entity);
 
-        // add super types
         for (String superTypeName : entityType.getAllSuperTypes()) {
-            AtlasGraphUtilsV1.addProperty(vertex, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
+            AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
         }
 
         final String guid = UUID.randomUUID().toString();
 
-        // add identity
-        AtlasGraphUtilsV1.setProperty(vertex, Constants.GUID_PROPERTY_KEY, guid);
+        AtlasGraphUtilsV1.setProperty(ret, Constants.GUID_PROPERTY_KEY, guid);
+        AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getEntityVersion(entity));
+
+        return ret;
+    }
+
+    public EntityMutationResponse mapAttributes(EntityMutationContext context) throws AtlasBaseException {
+        EntityMutationResponse resp = new EntityMutationResponse();
+
+        Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
+        Collection<AtlasEntity> updatedEntities = context.getUpdatedEntities();
+
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            for (AtlasEntity createdEntity : createdEntities) {
+                String          guid       = createdEntity.getGuid();
+                AtlasVertex     vertex     = context.getVertex(guid);
+                AtlasEntityType entityType = context.getType(guid);
+
+                mapAttributes(createdEntity, vertex, CREATE, context);
+
+                resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(updatedEntities)) {
+            for (AtlasEntity updatedEntity : updatedEntities) {
+                String          guid       = updatedEntity.getGuid();
+                AtlasVertex     vertex     = context.getVertex(guid);
+                AtlasEntityType entityType = context.getType(guid);
+
+                mapAttributes(updatedEntity, vertex, UPDATE, context);
+
+                resp.addEntity(UPDATE, constructHeader(updatedEntity, entityType, vertex));
+            }
+        }
+
+        RequestContextV1 req = RequestContextV1.get();
+
+        for (AtlasObjectId id : req.getDeletedEntityIds()) {
+            resp.addEntity(DELETE, constructHeader(id));
+        }
+
+        return resp;
+    }
+
+    private AtlasVertex createStructVertex(AtlasStruct struct) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createStructVertex({})", struct.getTypeName());
+        }
+
+        final AtlasVertex ret = graph.addVertex();
+
+        AtlasGraphUtilsV1.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, struct.getTypeName());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
+        AtlasGraphUtilsV1.setProperty(ret, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
+        GraphHelper.setProperty(ret, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== createStructVertex({})", struct.getTypeName());
+        }
+
+        return ret;
+    }
+
+    private void mapAttributes(AtlasStruct struct, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapAttributes({}, {})", op, struct.getTypeName());
+        }
+
+        if (MapUtils.isNotEmpty(struct.getAttributes())) {
+            AtlasStructType structType = getStructType(struct.getTypeName());
+
+            if (op.equals(CREATE)) {
+                for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
+                    Object attrValue = struct.getAttribute(attribute.getName());
+
+                    mapAttribute(attribute, attrValue, vertex, op, context);
+                }
+            } else if (op.equals(UPDATE)) {
+                for (String attrName : struct.getAttributes().keySet()) {
+                    AtlasAttribute attribute = structType.getAttribute(attrName);
+
+                    if (attribute != null) {
+                        Object attrValue = struct.getAttribute(attrName);
+
+                        mapAttribute(attribute, attrValue, vertex, op, context);
+                    } else {
+                        LOG.warn("mapAttributes(): invalid attribute {}.{}. Ignored..", struct.getTypeName(), attrName);
+                    }
+                }
+            }
+
+            updateModificationMetadata(vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapAttributes({}, {})", op, struct.getTypeName());
+        }
+    }
+
+    private void mapAttribute(AtlasAttribute attribute, Object attrValue, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
+        if (attrValue == null) {
+            AtlasType attrType = attribute.getAttributeType();
+
+            if (attrType.getTypeCategory() == TypeCategory.PRIMITIVE) {
+                if (attribute.getAttributeDef().getIsOptional()) {
+                    attrValue = attrType.createOptionalDefaultValue();
+                } else {
+                    attrValue = attrType.createDefaultValue();
+                }
+            }
+        }
+
+        AttributeMutationContext ctx = new AttributeMutationContext(op, vertex, attribute, attrValue);
+
+        mapToVertexByTypeCategory(ctx, context);
+    }
+
+    private Object mapToVertexByTypeCategory(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        if (ctx.getOp() == EntityMutations.EntityOperation.CREATE && ctx.getValue() == null) {
+            return null;
+        }
+
+        switch (ctx.getAttrType().getTypeCategory()) {
+            case PRIMITIVE:
+            case ENUM:
+                return mapPrimitiveValue(ctx);
+
+            case STRUCT: {
+                String    edgeLabel   = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
+                AtlasEdge currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
+                AtlasEdge edge        = currentEdge != null ? currentEdge : null;
+
+                ctx.setExistingEdge(edge);
+
+                AtlasEdge newEdge = mapStructValue(ctx, context);
+
+                if (currentEdge != null && !currentEdge.equals(newEdge)) {
+                    deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), false, true);
+                }
+
+                return newEdge;
+            }
+
+            case ENTITY: {
+                String          edgeLabel    = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
+                AtlasEdge       currentEdge  = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
+                AtlasEntityType instanceType = getInstanceType(ctx.getValue());
+                AtlasEdge       edge         = currentEdge != null ? currentEdge : null;
+
+                ctx.setElementType(instanceType);
+                ctx.setExistingEdge(edge);
+
+                AtlasEdge newEdge = mapEntityValue(ctx, context);
+
+                if (currentEdge != null && !currentEdge.equals(newEdge)) {
+                    deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), ctx.getAttribute().isOwnedRef(), true);
+                }
+
+                return newEdge;
+            }
 
-        // add version information
-        AtlasGraphUtilsV1.setProperty(vertex, Constants.VERSION_PROPERTY_KEY, Integer.valueOf(entity.getVersion().intValue()));
+            case MAP:
+                return mapMapValue(ctx, context);
 
-        return vertex;
+            case ARRAY:
+                return mapArrayValue(ctx, context);
+
+            default:
+                throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name());
+        }
+    }
+
+    private Object mapPrimitiveValue(AttributeMutationContext ctx) {
+        AtlasGraphUtilsV1.setProperty(ctx.getReferringVertex(), ctx.getVertexProperty(), ctx.getValue());
+
+        return ctx.getValue();
     }
 
+    private AtlasEdge mapStructValue(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapStructValue({})", ctx);
+        }
+
+        AtlasEdge ret = null;
+
+        if (ctx.getCurrentEdge() != null) {
+            updateVertex((AtlasStruct) ctx.getValue(), ctx.getCurrentEdge().getInVertex(), context);
 
-    @Override
-    public AtlasEdge toGraph(GraphMutationContext ctx) throws AtlasBaseException {
-        AtlasEdge result = null;
+            ret = ctx.getCurrentEdge();
+        } else if (ctx.getValue() != null) {
+            String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
+
+            if (ctx.getValue() instanceof AtlasStruct) {
+                ret = createVertex((AtlasStruct) ctx.getValue(), ctx.getReferringVertex(), edgeLabel, context);
+            } else if (ctx.getValue() instanceof Map) {
+                AtlasStruct stuct = new AtlasStruct(ctx.getAttrType().getTypeName(), (Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
+
+                ret = createVertex(stuct, ctx.getReferringVertex(), edgeLabel, context);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapStructValue({})", ctx);
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge mapEntityValue(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapEntityValue({})", ctx);
+        }
+
+        AtlasEdge ret = null;
+
+        String guid = getGuid(ctx.getValue());
 
-        AtlasObjectId guid = getId(ctx.getValue());
         AtlasVertex entityVertex = context.getDiscoveryContext().getResolvedEntityVertex(guid);
-        if ( ctx.getCurrentEdge().isPresent() ) {
-            result = updateEdge(ctx.getAttributeDef(), ctx.getValue(), ctx.getCurrentEdge().get(), entityVertex);
+
+        if (entityVertex == null) {
+            AtlasObjectId objId = getObjectId(ctx.getValue());
+
+            entityVertex = context.getDiscoveryContext().getResolvedEntityVertex(objId);
+        }
+
+        if (entityVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, ctx.getValue().toString());
+        }
+
+        if (ctx.getCurrentEdge() != null) {
+            ret = updateEdge(ctx.getAttributeDef(), ctx.getValue(), ctx.getCurrentEdge(), entityVertex);
         } else if (ctx.getValue() != null) {
-            String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexPropertyKey());
+            String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
+
             try {
-                result = graphHelper.getOrCreateEdge(ctx.getReferringVertex(), entityVertex, edgeLabel);
+                ret = graphHelper.getOrCreateEdge(ctx.getReferringVertex(), entityVertex, edgeLabel);
             } catch (RepositoryException e) {
                 throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
             }
         }
 
-        return result;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapEntityValue({})", ctx);
+        }
+
+        return ret;
+    }
+
+    private Map<String, Object> mapMapValue(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapMapValue({})", ctx);
+        }
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> newVal  = (Map<Object, Object>) ctx.getValue();
+        Map<String, Object> newMap  = new HashMap<>();
+        AtlasMapType        mapType = (AtlasMapType) ctx.getAttrType();
+
+        try {
+            AtlasAttribute      attribute   = ctx.getAttribute();
+            List<String>        currentKeys = GraphHelper.getListProperty(ctx.getReferringVertex(), ctx.getVertexProperty());
+            Map<String, Object> currentMap  = new HashMap<>();
+
+            if (CollectionUtils.isNotEmpty(currentKeys)) {
+                for (String key : currentKeys) {
+                    String propertyNameForKey  = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexProperty(), GraphHelper.encodePropertyKey(key));
+                    Object propertyValueForKey = getMapValueProperty(mapType.getValueType(), ctx.getReferringVertex(), propertyNameForKey);
+
+                    currentMap.put(key, propertyValueForKey);
+                }
+            }
+
+            if (MapUtils.isNotEmpty(newVal)) {
+                for (Map.Entry<Object, Object> entry : newVal.entrySet()) {
+                    String    key          = entry.getKey().toString();
+                    String    propertyName = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexProperty(), GraphHelper.encodePropertyKey(key));
+                    AtlasEdge existingEdge = getEdgeIfExists(mapType, currentMap, key);
+
+                    AttributeMutationContext mapCtx =  new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), attribute, entry.getValue(), propertyName, mapType.getValueType(), existingEdge);
+
+                    //Add/Update/Remove property value
+                    Object newEntry = mapCollectionElementsToVertex(mapCtx, context);
+                    setMapValueProperty(mapType.getValueType(), ctx.getReferringVertex(), propertyName, newEntry);
+
+                    newMap.put(key, newEntry);
+                }
+            }
+
+            Map<String, Object> finalMap = removeUnusedMapEntries(attribute, ctx.getReferringVertex(), ctx.getVertexProperty(), currentMap, newMap);
+
+            Set<String> newKeys = new HashSet<>(newMap.keySet());
+            newKeys.addAll(finalMap.keySet());
+
+            // for dereference on way out
+            GraphHelper.setListProperty(ctx.getReferringVertex(), ctx.getVertexProperty(), new ArrayList<>(newKeys));
+        } catch (AtlasException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapMapValue({})", ctx);
+        }
+
+        return newMap;
+    }
+
+    public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapArrayValue({})", ctx);
+        }
+
+        AtlasAttribute attribute       = ctx.getAttribute();
+        List           newElements     = (List) ctx.getValue();
+        AtlasArrayType arrType         = (AtlasArrayType) attribute.getAttributeType();
+        AtlasType      elementType     = arrType.getElementType();
+        List<Object>   currentElements = getArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexProperty());
+
+        List<Object> newElementsCreated = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(newElements)) {
+            for (int index = 0; index < newElements.size(); index++) {
+                AtlasEdge               existingEdge = getEdgeAt(currentElements, index, elementType);
+                AttributeMutationContext arrCtx      =  new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), ctx.getAttribute(), newElements.get(index),
+                                                                                     ctx.getVertexProperty(), elementType, existingEdge);
+
+                Object newEntry = mapCollectionElementsToVertex(arrCtx, context);
+
+                newElementsCreated.add(newEntry);
+            }
+        }
+
+        if (AtlasGraphUtilsV1.isReference(elementType)) {
+            List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(attribute, (List) currentElements, (List) newElementsCreated);
+            newElementsCreated.addAll(additionalEdges);
+        }
+
+        // for dereference on way out
+        setArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapArrayValue({})", ctx);
+        }
+
+        return newElementsCreated;
+    }
+
+
+    private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex referringVertex, String edgeLabel, EntityMutationContext context) throws AtlasBaseException {
+        AtlasVertex vertex = createStructVertex(struct);
+
+        mapAttributes(struct, vertex, CREATE, context);
+
+        try {
+            //TODO - Map directly in AtlasGraphUtilsV1
+            return graphHelper.getOrCreateEdge(referringVertex, vertex, edgeLabel);
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+    }
+
+    private void updateVertex(AtlasStruct struct, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
+        mapAttributes(struct, vertex, UPDATE, context);
+    }
+
+    private void updateModificationMetadata(AtlasVertex vertex) {
+        AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
+        GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
+    }
+
+    private int getEntityVersion(AtlasEntity entity) {
+        Long ret = entity != null ? entity.getVersion() : null;
+
+        return (ret != null) ? ret.intValue() : 0;
+    }
+
+    private AtlasStructType getStructType(String typeName) throws AtlasBaseException {
+        AtlasType objType = typeRegistry.getType(typeName);
+
+        if (!(objType instanceof AtlasStructType)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, typeName);
+        }
+
+        return (AtlasStructType)objType;
+    }
+
+    private Object mapCollectionElementsToVertex(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
+        switch(ctx.getAttrType().getTypeCategory()) {
+            case PRIMITIVE:
+            case ENUM:
+                return ctx.getValue();
+
+            case STRUCT:
+                return mapStructValue(ctx, context);
+
+            case ENTITY:
+                AtlasEntityType instanceType = getInstanceType(ctx.getValue());
+                ctx.setElementType(instanceType);
+                return mapEntityValue(ctx, context);
+
+            case MAP:
+            case ARRAY:
+            default:
+                throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name());
+        }
+    }
+
+    private static AtlasObjectId getObjectId(Object val) throws AtlasBaseException {
+        if (val != null) {
+            if ( val instanceof  AtlasObjectId) {
+                return ((AtlasObjectId) val);
+            } else if (val instanceof AtlasEntity) {
+                return ((AtlasEntity) val).getAtlasObjectId();
+            } else if (val instanceof Map) {
+                AtlasObjectId ret = new AtlasObjectId((Map)val);
+
+                if (ret.isValid()) {
+                    return ret;
+                }
+            }
+
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+        }
+
+        return null;
+    }
+
+    private static String getGuid(Object val) throws AtlasBaseException {
+        if (val != null) {
+            if ( val instanceof  AtlasObjectId) {
+                return ((AtlasObjectId) val).getGuid();
+            } else if (val instanceof AtlasEntity) {
+                return ((AtlasEntity) val).getGuid();
+            } else if (val instanceof Map) {
+                Object guidVal = ((Map)val).get(AtlasObjectId.KEY_GUID);
+
+                return guidVal != null ? guidVal.toString() : null;
+            }
+        }
+
+        return null;
+    }
+
+    private AtlasEntityType getInstanceType(Object val) throws AtlasBaseException {
+        AtlasEntityType ret = null;
+
+        if (val != null) {
+            String typeName = null;
+
+            if (val instanceof AtlasObjectId) {
+                typeName = ((AtlasObjectId)val).getTypeName();
+            } else if (val instanceof AtlasEntity) {
+                typeName = ((AtlasEntity)val).getTypeName();
+            } else if (val instanceof Map) {
+                Object typeNameVal = ((Map)val).get(AtlasObjectId.KEY_TYPENAME);
+
+                if (typeNameVal != null) {
+                    typeName = typeNameVal.toString();
+                }
+            }
+
+            ret = typeName != null ? typeRegistry.getEntityTypeByName(typeName) : null;
+
+            if (ret == null) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+            }
+        }
+
+        return ret;
+    }
+
+
+    public static Object getMapValueProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName) {
+        if (AtlasGraphUtilsV1.isReference(elementType)) {
+            return vertex.getProperty(vertexPropertyName, AtlasEdge.class);
+        }
+        else {
+            return vertex.getProperty(vertexPropertyName, String.class).toString();
+        }
+    }
+
+    private static void setMapValueProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName, Object value) {
+        if (AtlasGraphUtilsV1.isReference(elementType)) {
+            vertex.setPropertyFromElementId(vertexPropertyName, (AtlasEdge)value);
+        }
+        else {
+            vertex.setProperty(vertexPropertyName, value);
+        }
     }
 
-    @Override
-    public void cleanUp() throws AtlasBaseException {
+    //Remove unused entries from map
+    private Map<String, Object> removeUnusedMapEntries(AtlasAttribute attribute, AtlasVertex vertex, String propertyName,
+                                                       Map<String, Object> currentMap, Map<String, Object> newMap)
+                                                                             throws AtlasException, AtlasBaseException {
+        AtlasMapType        mapType       = (AtlasMapType) attribute.getAttributeType();
+        Map<String, Object> additionalMap = new HashMap<>();
+
+        for (String currentKey : currentMap.keySet()) {
+            boolean shouldDeleteKey = !newMap.containsKey(currentKey);
+
+            if (AtlasGraphUtilsV1.isReference(mapType.getValueType())) {
+                //Delete the edge reference if its not part of new edges created/updated
+                AtlasEdge currentEdge = (AtlasEdge)currentMap.get(currentKey);
+
+                if (!newMap.values().contains(currentEdge)) {
+                    boolean deleted = deleteHandler.deleteEdgeReference(currentEdge, mapType.getValueType().getTypeCategory(), attribute.isOwnedRef(), true);
+
+                    /* TODO: need to review the following 'if' block. Wouldn't this leave deleted keys in the map?
+                     *
+                    if (!deleted) {
+                        additionalMap.put(currentKey, currentEdge);
+                        shouldDeleteKey = false;
+                    }
+                    *
+                    */
+                }
+            }
+
+            if (shouldDeleteKey) {
+                String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, GraphHelper.encodePropertyKey(currentKey));
+                GraphHelper.setProperty(vertex, propertyNameForKey, null);
+            }
+        }
+
+        return additionalMap;
     }
 
-    private AtlasEdge updateEdge(AtlasAttributeDef attributeDef, Object value,  AtlasEdge currentEdge, final AtlasVertex entityVertex) throws AtlasBaseException {
+    private static AtlasEdge getEdgeIfExists(AtlasMapType mapType, Map<String, Object> currentMap, String keyStr) {
+        AtlasEdge ret = null;
+
+        if (AtlasGraphUtilsV1.isReference(mapType.getValueType())) {
+            Object val = currentMap.get(keyStr);
+
+            if (val != null) {
+                ret = (AtlasEdge) val;
+            }
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge updateEdge(AtlasAttributeDef attributeDef, Object value, AtlasEdge currentEdge, final AtlasVertex entityVertex) throws AtlasBaseException {
 
         LOG.debug("Updating entity reference {} for reference attribute {}",  attributeDef.getName());
         // Update edge if it exists
@@ -132,60 +664,69 @@ public class EntityGraphMapper implements InstanceGraphMapper<AtlasEdge> {
         return newEdge;
     }
 
-    public EntityMutationResponse
-    mapAttributes(EntityMutationContext ctx) throws AtlasBaseException {
+    public static List<Object> getArrayElementsProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName) {
+        if (AtlasGraphUtilsV1.isReference(elementType)) {
+            return (List)vertex.getListProperty(vertexPropertyName, AtlasEdge.class);
+        }
+        else {
+            return (List)vertex.getListProperty(vertexPropertyName);
+        }
+    }
 
-        this.context = ctx;
-        structVertexMapper.init(this);
+    private AtlasEdge getEdgeAt(List<Object> currentElements, int index, AtlasType elemType) {
+        AtlasEdge ret = null;
 
-        EntityMutationResponse resp = new EntityMutationResponse();
-        //Map attributes
-        if (ctx.getCreatedEntities() != null) {
-            for (AtlasEntity createdEntity : ctx.getCreatedEntities()) {
-                AtlasVertex vertex = ctx.getVertex(createdEntity);
-                structVertexMapper.mapAttributestoVertex(EntityMutations.EntityOperation.CREATE, ctx.getType(createdEntity), createdEntity, vertex);
-                resp.addEntity(EntityMutations.EntityOperation.CREATE, constructHeader(createdEntity, ctx.getType(createdEntity), vertex));
+        if (AtlasGraphUtilsV1.isReference(elemType)) {
+            if (currentElements != null && index < currentElements.size()) {
+                ret = (AtlasEdge) currentElements.get(index);
             }
         }
 
-        if (ctx.getUpdatedEntities() != null) {
-            for (AtlasEntity updated : ctx.getUpdatedEntities()) {
-                AtlasVertex vertex = ctx.getVertex(updated);
-                structVertexMapper.mapAttributestoVertex(EntityMutations.EntityOperation.UPDATE, ctx.getType(updated), updated, vertex);
+        return ret;
+    }
 
-                resp.addEntity(EntityMutations.EntityOperation.UPDATE, constructHeader(updated, ctx.getType(updated), vertex));
-            }
-        }
+    //Removes unused edges from the old collection, compared to the new collection
+    private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(currentEntries)) {
+            AtlasStructType entityType = attribute.getDefinedInType();
+            AtlasType       entryType  = ((AtlasArrayType)attribute.getAttributeType()).getElementType();
 
-        RequestContextV1 req = RequestContextV1.get();
-        for (AtlasObjectId id : req.getDeletedEntityIds()) {
-            resp.addEntity(EntityMutations.EntityOperation.DELETE, constructHeader(id));
-        }
+            if (AtlasGraphUtilsV1.isReference(entryType)) {
+                Collection<AtlasEdge> edgesToRemove = CollectionUtils.subtract(currentEntries, newEntries);
 
-        return resp;
-    }
+                if (CollectionUtils.isNotEmpty(edgesToRemove)) {
+                    List<AtlasEdge> additionalElements = new ArrayList<>();
 
+                    for (AtlasEdge edge : edgesToRemove) {
+                        boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(), true);
 
-    public AtlasObjectId getId(Object value) throws AtlasBaseException {
-        if (value != null) {
-            if ( value instanceof  AtlasObjectId) {
-                return ((AtlasObjectId) value);
-            } else if (value instanceof AtlasEntity) {
-                return ((AtlasEntity) value).getAtlasObjectId();
-            } else if (value instanceof Map) {
-                AtlasObjectId ret = new AtlasObjectId((Map)value);
+                        /* TODO: need to review the following 'if' block. Wouldn't this leave deleted elements continue to be in array?
+                         *
+                        if (!deleted) {
+                            additionalElements.add(edge);
+                        }
+                         *
+                         */
+                    }
 
-                if (ret.isValid()) {
-                    return ret;
+                    return additionalElements;
                 }
             }
-
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, (String) value);
         }
 
-        return null;
+        return Collections.emptyList();
+    }
+
+    private void setArrayElementsProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName, List<Object> values) {
+        if (AtlasGraphUtilsV1.isReference(elementType)) {
+            GraphHelper.setListPropertyFromElementIds(vertex, vertexPropertyName, (List) values);
+        }
+        else {
+            GraphHelper.setProperty(vertex, vertexPropertyName, values);
+        }
     }
 
+
     private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) {
         //TODO - enhance to return only selective attributes
         AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName(), AtlasGraphUtilsV1.getIdFromVertex(vertex), entity.getAttributes());
@@ -211,18 +752,4 @@ public class EntityGraphMapper implements InstanceGraphMapper<AtlasEdge> {
 
         return entity;
     }
-
-    public EntityMutationContext getContext() {
-        return context;
-    }
-
-    public AtlasEntityType getInstanceType(Object val) throws AtlasBaseException {
-        AtlasObjectId guid = getId(val);
-
-        if ( guid != null) {
-            return (AtlasEntityType) getContext().getType(guid);
-        }
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index 255c52c..847e5fe 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -100,6 +101,22 @@ public final class EntityGraphRetriever {
         return ret;
     }
 
+    public AtlasEntitiesWithExtInfo toAtlasEntitiesWithExtInfo(List<String> guids) throws AtlasBaseException {
+        AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
+
+        for (String guid : guids) {
+            AtlasVertex vertex = getEntityVertex(guid);
+
+            AtlasEntity entity = mapVertexToAtlasEntity(vertex, ret);
+
+            ret.addEntity(entity);
+        }
+
+        ret.compact();
+
+        return ret;
+    }
+
     private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
         try {
             return graphHelper.getVertexForGUID(guid);
@@ -405,7 +422,7 @@ public final class EntityGraphRetriever {
                         ret = entity.getAtlasObjectId();
                     }
                 } else {
-                    ret = new AtlasObjectId(GraphHelper.getTypeName(referenceVertex), GraphHelper.getGuid(referenceVertex));
+                    ret = new AtlasObjectId(GraphHelper.getGuid(referenceVertex), GraphHelper.getTypeName(referenceVertex));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
index 310b455..f24cab3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
@@ -24,32 +24,37 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
 
 import java.util.*;
 
 public class EntityMutationContext {
-    private final EntityGraphDiscoveryContext         context;
-    private final List<AtlasEntity>                   entitiesCreated = new ArrayList<>();
-    private final List<AtlasEntity>                   entitiesUpdated = new ArrayList<>();
-    private final Map<AtlasObjectId, AtlasEntityType> entityVsType    = new HashMap<>();
-    private final Map<AtlasObjectId, AtlasVertex>     entityVsVertex  = new HashMap<>();
+    private final EntityGraphDiscoveryContext  context;
+    private final List<AtlasEntity>            entitiesCreated = new ArrayList<>();
+    private final List<AtlasEntity>            entitiesUpdated = new ArrayList<>();
+    private final Map<String, AtlasEntityType> entityVsType    = new HashMap<>();
+    private final Map<String, AtlasVertex>     entityVsVertex  = new HashMap<>();
+    private final Map<String, String>          guidAssignments = new HashMap<>();
 
     public EntityMutationContext(final EntityGraphDiscoveryContext context) {
         this.context = context;
     }
 
-    public void addCreated(AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
-        AtlasObjectId objId = entity.getAtlasObjectId();
+    public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
         entitiesCreated.add(entity);
-        entityVsType.put(objId, type);
-        entityVsVertex.put(objId, atlasVertex);
+        entityVsType.put(entity.getGuid(), type);
+        entityVsVertex.put(entity.getGuid(), atlasVertex);
+
+        if (!StringUtils.equals(internalGuid, entity.getGuid())) {
+            guidAssignments.put(internalGuid, entity.getGuid());
+            entityVsVertex.put(internalGuid, atlasVertex);
+        }
     }
 
     public void addUpdated(AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
-        AtlasObjectId objId = entity.getAtlasObjectId();
         entitiesUpdated.add(entity);
-        entityVsType.put(objId, type);
-        entityVsVertex.put(objId, atlasVertex);
+        entityVsType.put(entity.getGuid(), type);
+        entityVsVertex.put(entity.getGuid(), atlasVertex);
     }
 
     public EntityGraphDiscoveryContext getDiscoveryContext() {
@@ -64,21 +69,15 @@ public class EntityMutationContext {
         return entitiesUpdated;
     }
 
-    public AtlasEntityType getType(AtlasEntity entity) {
-        return entityVsType.get(entity.getAtlasObjectId());
-    }
-
-    public AtlasType getType(AtlasObjectId entityId) {
-        return entityVsType.get(entityId);
+    public Map<String, String> getGuidAssignments() {
+        return guidAssignments;
     }
 
-    public AtlasVertex getVertex(AtlasEntity entity) {
-        return entityVsVertex.get(entity.getAtlasObjectId());
+    public AtlasEntityType getType(String guid) {
+        return entityVsType.get(guid);
     }
 
-    public AtlasVertex getVertex(AtlasObjectId entityId) {
-        return entityVsVertex.get(entityId);
-    }
+    public AtlasVertex getVertex(String guid) { return entityVsVertex.get(guid); }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
index 1d939fe..4c43921 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
@@ -28,5 +28,5 @@ public interface EntityStream {
 
     void reset();
 
-    AtlasEntity getById(AtlasObjectId id);
+    AtlasEntity getByGuid(String guid);
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java
deleted file mode 100644
index 7dbedc7..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-
-import com.google.common.base.Optional;
-import org.apache.atlas.model.instance.EntityMutations;
-import org.apache.atlas.model.typedef.AtlasStructDef;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
-
-import java.util.Objects;
-
-public class GraphMutationContext {
-
-
-    private EntityMutations.EntityOperation op;
-    /**
-     * Atlas Attribute
-     */
-
-    private AtlasStructType.AtlasAttribute attribute;
-
-    /**
-     * Overriding type for which elements are being mapped
-     */
-    private AtlasType currentElementType;
-
-    /**
-     * Current attribute value/entity/Struct instance
-     */
-    private Object value;
-
-    /**
-     *
-     * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits
-     */
-    AtlasVertex referringVertex;
-
-    /**
-     * the vertex property that we are updating
-     */
-
-    String vertexPropertyKey;
-
-    /**
-     * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait
-     */
-    Optional<AtlasEdge> existingEdge;
-
-
-    private GraphMutationContext(final Builder builder) {
-        this.op = builder.op;
-        this.attribute = builder.attribute;
-        this.currentElementType = builder.elementType;
-        this.existingEdge = builder.currentEdge;
-        this.value = builder.currentValue;
-        this.referringVertex = builder.referringVertex;
-        this.vertexPropertyKey = builder.vertexPropertyKey;
-    }
-
-    public String getVertexPropertyKey() {
-        return vertexPropertyKey;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(op, attribute, value, referringVertex, vertexPropertyKey, existingEdge);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null) {
-            return false;
-        } else if (obj == this) {
-            return true;
-        } else if (obj.getClass() != getClass()) {
-            return false;
-        } else {
-            GraphMutationContext rhs = (GraphMutationContext) obj;
-            return Objects.equals(attribute, rhs.getAttribute())
-                 && Objects.equals(value, rhs.getValue())
-                 && Objects.equals(referringVertex, rhs.getReferringVertex())
-                 && Objects.equals(vertexPropertyKey, rhs.getReferringVertex())
-                 && Objects.equals(existingEdge, rhs.getCurrentEdge())
-                 && Objects.equals(op, rhs.getOp());
-        }
-    }
-
-
-    public static final class Builder {
-
-        private final EntityMutations.EntityOperation op;
-
-        private final AtlasStructType.AtlasAttribute attribute;
-
-        private final AtlasType elementType;
-
-        private final Object currentValue;
-
-        private AtlasVertex referringVertex;
-
-        private Optional<AtlasEdge> currentEdge = Optional.absent();
-
-        private  String vertexPropertyKey;
-
-
-        public Builder(EntityMutations.EntityOperation op, AtlasStructType.AtlasAttribute attribute, AtlasType currentElementType, Object currentValue) {
-            this.op = op;
-            this.attribute = attribute;
-            this.elementType = currentElementType;
-            this.currentValue = currentValue;
-        }
-
-        public Builder(EntityMutations.EntityOperation op, AtlasStructType.AtlasAttribute attribute, Object currentValue) {
-           this(op, attribute, null, currentValue);
-        }
-
-        Builder referringVertex(AtlasVertex referringVertex) {
-            this.referringVertex = referringVertex;
-            return this;
-        }
-
-        Builder edge(AtlasEdge edge) {
-            this.currentEdge = Optional.of(edge);
-            return this;
-        }
-
-        Builder edge(Optional<AtlasEdge> edge) {
-            this.currentEdge = edge;
-            return this;
-        }
-
-        Builder vertexProperty(String propertyKey) {
-            this.vertexPropertyKey = propertyKey;
-            return this;
-        }
-
-        GraphMutationContext build() {
-            return new GraphMutationContext(this);
-        }
-    }
-
-    public AtlasStructType getParentType() {
-        return attribute.getDefinedInType();
-    }
-
-    public AtlasStructDef getStructDef() {
-        return attribute.getDefinedInDef();
-    }
-
-    public AtlasAttributeDef getAttributeDef() {
-        return attribute.getAttributeDef();
-    }
-
-    public AtlasType getAttrType() {
-        return currentElementType == null ? attribute.getAttributeType() : currentElementType;
-    }
-
-    public AtlasType getCurrentElementType() {
-        return currentElementType;
-    }
-
-    public Object getValue() {
-        return value;
-    }
-
-    public AtlasVertex getReferringVertex() {
-        return referringVertex;
-    }
-
-    public Optional<AtlasEdge> getCurrentEdge() {
-        return existingEdge;
-    }
-
-    public void setElementType(final AtlasType attrType) {
-        this.currentElementType = attrType;
-    }
-
-    public AtlasStructType.AtlasAttribute getAttribute() {
-        return attribute;
-    }
-
-    public EntityMutations.EntityOperation getOp() {
-        return op;
-    }
-
-    public void setExistingEdge(final Optional<AtlasEdge> existingEdge) {
-        this.existingEdge = existingEdge;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
index 2ffd10e..41ea75b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java
@@ -17,11 +17,9 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
-import com.google.common.base.Optional;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -29,93 +27,53 @@ import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.EntityResolver;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.persistence.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 public class IDBasedEntityResolver implements EntityResolver {
+    private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
 
-    private final GraphHelper              graphHelper   = GraphHelper.getInstance();
-    private final Map<String, AtlasEntity> idToEntityMap = new HashMap<>();
-    private EntityGraphDiscoveryContext    context;
 
-    @Override
-    public void init(EntityGraphDiscoveryContext context) throws AtlasBaseException {
-        this.context = context;
+    private final GraphHelper graphHelper = GraphHelper.getInstance();
 
-        for (AtlasEntity entity : context.getRootEntities()) {
-            idToEntityMap.put(entity.getGuid(), entity);
-        }
-    }
 
-    public EntityGraphDiscoveryContext resolveEntityReferences() throws AtlasBaseException {
+    public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
         if (context == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Entity resolver not initialized");
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "IDBasedEntityResolver.resolveEntityReferences(): context is null");
         }
 
-        List<AtlasObjectId> resolvedReferences = new ArrayList<>();
+        EntityStream entityStream = context.getEntityStream();
 
-        for (AtlasObjectId objId : context.getUnresolvedIds()) {
-            if (objId.isAssignedGuid()) {
-                //validate in graph repo that given guid, typename exists
-                Optional<AtlasVertex> vertex = resolveGuid(objId);
+        for (String guid : context.getReferencedGuids()) {
+            if (AtlasEntity.isAssigned(guid)) { // validate in graph repo that given guid exists
+                AtlasVertex vertex = resolveGuid(guid);
 
-                if (vertex.isPresent()) {
-                    context.addResolvedId(objId, vertex.get());
-                    resolvedReferences.add(objId);
-                }
+                context.addResolvedGuid(guid, vertex);
+            } else  if (entityStream.getByGuid(guid) != null) { //check if entity stream have this reference id
+                context.addLocalGuidReference(guid);
             } else {
-                //check if root references have this temporary id
-               if (!idToEntityMap.containsKey(objId.getGuid()) ) {
-                   throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, objId.toString());
-               }
-                resolvedReferences.add(objId);
-            }
-
-        }
-
-        context.removeUnResolvedIds(resolvedReferences);
-
-        //Resolve root references
-        for (AtlasEntity entity : context.getRootEntities()) {
-            AtlasObjectId objId = entity.getAtlasObjectId();
-
-            if (!context.isResolvedId(objId) && AtlasEntity.isAssigned(entity.getGuid())) {
-                Optional<AtlasVertex> vertex = resolveGuid(objId);
-
-                if (vertex.isPresent()) {
-                    context.addResolvedId(objId, vertex.get());
-                    context.removeUnResolvedId(objId);
-                }
+                throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
             }
         }
 
         return context;
     }
 
-    private Optional<AtlasVertex> resolveGuid(AtlasObjectId objId) throws AtlasBaseException {
+    private AtlasVertex resolveGuid(String guid) throws AtlasBaseException {
         //validate in graph repo that given guid, typename exists
         AtlasVertex vertex = null;
         try {
-            vertex = graphHelper.findVertex(Constants.GUID_PROPERTY_KEY, objId.getGuid(),
-                Constants.TYPE_NAME_PROPERTY_KEY, objId.getTypeName(),
-                Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
+            vertex = graphHelper.findVertex(Constants.GUID_PROPERTY_KEY, guid,
+                                            Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
         } catch (EntityNotFoundException e) {
             //Ignore
         }
-        if ( vertex != null ) {
-            return Optional.of(vertex);
+
+        if (vertex != null) {
+            return vertex;
         } else {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, objId.getGuid());
+            throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
         }
     }
-
-    @Override
-    public void cleanUp() throws AtlasBaseException {
-        idToEntityMap.clear();
-        this.context = null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
index 0d0b949..241f6d0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
@@ -27,14 +27,11 @@ import java.util.Map;
 
 public class InMemoryMapEntityStream implements EntityStream {
 
-    private final Map<AtlasObjectId, AtlasEntity>                 entities = new HashMap<>();
-    private       Iterator<Map.Entry<AtlasObjectId, AtlasEntity>> iterator;
-
-    public InMemoryMapEntityStream(Map<String, AtlasEntity> entityMap) {
-        for (AtlasEntity entity : entityMap.values()) {
-            entities.put(entity.getAtlasObjectId(), entity);
-        }
+    private final Map<String, AtlasEntity>                 entities;
+    private       Iterator<Map.Entry<String, AtlasEntity>> iterator;
 
+    public InMemoryMapEntityStream(Map<String, AtlasEntity> entities) {
+        this.entities = entities;
         this.iterator = entities.entrySet().iterator();
     }
 
@@ -54,7 +51,7 @@ public class InMemoryMapEntityStream implements EntityStream {
     }
 
     @Override
-    public AtlasEntity getById(final AtlasObjectId id) {
-        return entities.get(id);
+    public AtlasEntity getByGuid(final String guid) {
+        return entities.get(guid);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java
index 7e87d39..3cae8a5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java
@@ -18,9 +18,6 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasEntityType;
 
 public interface InstanceGraphMapper<T> {
 
@@ -31,9 +28,5 @@ public interface InstanceGraphMapper<T> {
      * @return the value that was mapped to the vertex
      * @throws AtlasBaseException
      */
-    T toGraph(GraphMutationContext ctx) throws AtlasBaseException;
-
-
-    void cleanUp() throws AtlasBaseException;
-
+    T toGraph(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java
deleted file mode 100644
index bb7aeb6..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasMapType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.MapUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.atlas.repository.graph.GraphHelper.string;
-
-public class MapVertexMapper implements InstanceGraphMapper<Map> {
-
-    private DeleteHandlerV1 deleteHandler;
-
-    private static final Logger LOG = LoggerFactory.getLogger(MapVertexMapper.class);
-
-    private StructVertexMapper structVertexMapper;
-
-    @Inject
-    public MapVertexMapper(DeleteHandlerV1 deleteHandler) {
-        this.deleteHandler = deleteHandler;
-    }
-
-    void init(StructVertexMapper structVertexMapper) {
-        this.structVertexMapper = structVertexMapper;
-    }
-
-    @Override
-    public Map<String, Object> toGraph(GraphMutationContext ctx) throws AtlasBaseException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Mapping instance to vertex {} for map type {}", string(ctx.getReferringVertex()), ctx.getAttrType().getTypeName());
-        }
-
-        @SuppressWarnings("unchecked") Map<Object, Object> newVal =
-            (Map<Object, Object>) ctx.getValue();
-
-        boolean newAttributeEmpty = MapUtils.isEmpty(newVal);
-
-        Map<String, Object> currentMap = new HashMap<>();
-        Map<String, Object> newMap = new HashMap<>();
-
-        AtlasMapType mapType = (AtlasMapType) ctx.getAttrType();
-
-        try {
-            List<String> currentKeys = GraphHelper.getListProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey());
-            if (currentKeys != null && !currentKeys.isEmpty()) {
-                for (String key : currentKeys) {
-                    String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexPropertyKey(), key);
-                    Object propertyValueForKey = getMapValueProperty(mapType.getValueType(), ctx.getReferringVertex(), propertyNameForKey);
-                    currentMap.put(key, propertyValueForKey);
-                }
-            }
-
-            if (!newAttributeEmpty) {
-                for (Map.Entry<Object, Object> entry : newVal.entrySet()) {
-                    String keyStr = entry.getKey().toString();
-                    String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexPropertyKey(), keyStr);
-                    Optional<AtlasEdge> existingEdge = getEdgeIfExists(mapType, currentMap, keyStr);
-
-                    GraphMutationContext mapCtx =  new GraphMutationContext.Builder(ctx.getOp(), ctx.getAttribute(), mapType.getValueType(), entry.getValue())
-                        .referringVertex(ctx.getReferringVertex())
-                        .edge(existingEdge)
-                        .vertexProperty(propertyNameForKey).build();
-
-
-                    //Add/Update/Remove property value
-                    Object newEntry = structVertexMapper.mapCollectionElementsToVertex(mapCtx);
-                    MapVertexMapper.setMapValueProperty(mapType.getValueType(), ctx.getReferringVertex(), propertyNameForKey, newEntry);
-
-                    newMap.put(keyStr, newEntry);
-                }
-            }
-
-            Map<String, Object> finalMap =
-                removeUnusedMapEntries(ctx.getParentType(), mapType, ctx.getAttributeDef(), ctx.getReferringVertex(), ctx.getVertexPropertyKey(), currentMap, newMap);
-
-            Set<String> newKeys = new HashSet<>(newMap.keySet());
-            newKeys.addAll(finalMap.keySet());
-
-            // for dereference on way out
-            GraphHelper.setListProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey(), new ArrayList<>(newKeys));
-        } catch (AtlasException e) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Map values set in vertex {} {}", mapType.getTypeName(), newMap);
-        }
-
-        return newMap;
-    }
-
-    @Override
-    public void cleanUp() throws AtlasBaseException {
-    }
-
-
-    public static Object getMapValueProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName) {
-        String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
-        if (AtlasGraphUtilsV1.isReference(elementType)) {
-            return instanceVertex.getProperty(actualPropertyName, AtlasEdge.class);
-        }
-        else {
-            return instanceVertex.getProperty(actualPropertyName, String.class).toString();
-        }
-    }
-
-    public static void setMapValueProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName, Object value) {
-        String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
-        if (AtlasGraphUtilsV1.isReference(elementType)) {
-            instanceVertex.setPropertyFromElementId(actualPropertyName, (AtlasEdge)value);
-        }
-        else {
-            instanceVertex.setProperty(actualPropertyName, value);
-        }
-    }
-
-    //Remove unused entries from map
-    private Map<String, Object> removeUnusedMapEntries(
-        AtlasStructType entityType,
-        AtlasMapType mapType, AtlasAttributeDef attributeDef,
-        AtlasVertex instanceVertex, String propertyName,
-        Map<String, Object> currentMap,
-        Map<String, Object> newMap)
-        throws AtlasException, AtlasBaseException {
-
-        Map<String, Object> additionalMap = new HashMap<>();
-        for (String currentKey : currentMap.keySet()) {
-
-            boolean shouldDeleteKey = !newMap.containsKey(currentKey);
-            if (AtlasGraphUtilsV1.isReference(mapType.getValueType())) {
-
-                //Delete the edge reference if its not part of new edges created/updated
-                AtlasEdge currentEdge = (AtlasEdge)currentMap.get(currentKey);
-
-                if (!newMap.values().contains(currentEdge)) {
-                    boolean deleteChildReferences = StructVertexMapper.shouldManageChildReferences(entityType, attributeDef.getName());
-                    boolean deleted =
-                        deleteHandler.deleteEdgeReference(currentEdge, mapType.getValueType().getTypeCategory(), deleteChildReferences, true);
-                    if (!deleted) {
-                        additionalMap.put(currentKey, currentEdge);
-                        shouldDeleteKey = false;
-                    }
-                }
-            }
-
-            if (shouldDeleteKey) {
-                String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey);
-                GraphHelper.setProperty(instanceVertex, propertyNameForKey, null);
-            }
-        }
-        return additionalMap;
-    }
-    
-    private Optional<AtlasEdge> getEdgeIfExists(AtlasMapType mapType, Map<String, Object> currentMap, String keyStr) {
-        Optional<AtlasEdge> existingEdge = Optional.absent();
-        if ( AtlasGraphUtilsV1.isReference(mapType.getValueType()) ) {
-            if ( currentMap.get(keyStr) != null) {
-                existingEdge = Optional.of((AtlasEdge) currentMap.get(keyStr));
-            }
-        }
-        
-        return existingEdge;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java
deleted file mode 100644
index edcc12d..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-import com.google.common.base.Optional;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContextV1;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TypeCategory;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasStruct;
-import org.apache.atlas.model.instance.EntityMutations;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.RepositoryException;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class StructVertexMapper implements InstanceGraphMapper<AtlasEdge> {
-
-    private final AtlasGraph graph;
-
-    private final GraphHelper graphHelper = GraphHelper.getInstance();
-
-    private final MapVertexMapper mapVertexMapper;
-
-    private final ArrayVertexMapper arrVertexMapper;
-
-    private EntityGraphMapper entityVertexMapper;
-
-    private DeleteHandlerV1 deleteHandler;
-
-    private static final Logger LOG = LoggerFactory.getLogger(StructVertexMapper.class);
-
-    public StructVertexMapper(ArrayVertexMapper arrayVertexMapper, MapVertexMapper mapVertexMapper, DeleteHandlerV1 deleteHandler) {
-        this.graph = AtlasGraphProvider.getGraphInstance();;
-        this.mapVertexMapper = mapVertexMapper;
-        this.arrVertexMapper = arrayVertexMapper;
-        this.deleteHandler = deleteHandler;
-    }
-
-    void init(final EntityGraphMapper entityVertexMapper) {
-        this.entityVertexMapper = entityVertexMapper;
-    }
-
-    @Override
-    public AtlasEdge toGraph(GraphMutationContext ctx) throws AtlasBaseException {
-        AtlasEdge ret = null;
-
-        if ( ctx.getCurrentEdge().isPresent() ) {
-            updateVertex(ctx.getParentType(), (AtlasStructType) ctx.getAttrType(), ctx.getAttributeDef(), (AtlasStruct) ctx.getValue(), ctx.getCurrentEdge().get().getInVertex());
-            ret = ctx.getCurrentEdge().get();
-        } else if (ctx.getValue() != null) {
-            String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexPropertyKey());
-            ret = createVertex(ctx.getParentType(), (AtlasStructType) ctx.getAttrType(), ctx.getAttributeDef(), (AtlasStruct) ctx.getValue(), ctx.getReferringVertex(), edgeLabel);
-        }
-
-        return ret;
-    }
-
-    @Override
-    public void cleanUp() throws AtlasBaseException {
-    }
-
-    public static boolean shouldManageChildReferences(AtlasStructType type, String attributeName) {
-        AtlasStructType.AtlasAttribute attribute = type.getAttribute(attributeName);
-
-        return attribute != null ? attribute.isOwnedRef() : false;
-    }
-
-    /**
-     * Map attributes for entity, struct or trait
-     *
-     * @param op
-     * @param structType
-     * @param struct
-     * @param vertex
-     * @return
-     * @throws AtlasBaseException
-     */
-    public AtlasVertex mapAttributestoVertex(final EntityMutations.EntityOperation op, AtlasStructType structType, AtlasStruct struct, AtlasVertex vertex) throws AtlasBaseException {
-        if (struct.getAttributes() != null) {
-            if (op.equals(EntityMutations.EntityOperation.CREATE)) {
-                final Map<String, AtlasStructType.AtlasAttribute> allAttributes = structType.getAllAttributes();
-                for (String attrName : allAttributes.keySet()) {
-                    Object value = struct.getAttribute(attrName);
-
-                    mapAttribute(op, structType, attrName, value, vertex);
-                }
-            } else if (op.equals(EntityMutations.EntityOperation.UPDATE)) {
-                for (String attrName : struct.getAttributes().keySet()) {
-                    Object value = struct.getAttribute(attrName);
-                    mapAttribute(op, structType, attrName, value, vertex);
-                }
-            }
-            updateModificationMetadata(vertex);
-        }
-        return vertex;
-    }
-
-    private void mapAttribute(final EntityMutations.EntityOperation op, AtlasStructType structType, String attrName, Object value, AtlasVertex vertex) throws AtlasBaseException {
-        AtlasType attributeType = structType.getAttributeType(attrName);
-        if (attributeType != null) {
-            final AtlasStructType.AtlasAttribute attribute = structType.getAttribute(attrName);
-
-            if (value == null) {
-                if ( attribute.getAttributeType().getTypeCategory() == TypeCategory.PRIMITIVE) {
-                    if ( attribute.getAttributeDef().getIsOptional()) {
-                        value = attribute.getAttributeType().createOptionalDefaultValue();
-                    } else {
-                        value = attribute.getAttributeType().createDefaultValue();
-                    }
-                }
-            }
-
-            final String vertexProperty = structType.getQualifiedAttributeName(attrName);
-            GraphMutationContext ctx = new GraphMutationContext.Builder(op, attribute, value)
-                .referringVertex(vertex)
-                .vertexProperty(GraphHelper.encodePropertyKey(vertexProperty)).build();
-            mapToVertexByTypeCategory(ctx);
-        }
-    }
-
-    private void updateModificationMetadata(AtlasVertex vertex) {
-        //Set updated timestamp
-        AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
-        GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
-    }
-
-
-    protected Object mapToVertexByTypeCategory(GraphMutationContext ctx) throws AtlasBaseException {
-        if (ctx.getOp() == EntityMutations.EntityOperation.CREATE && ctx.getValue() == null) {
-            return null;
-        }
-
-        switch (ctx.getAttrType().getTypeCategory()) {
-        case PRIMITIVE:
-        case ENUM:
-            return primitivesToVertex(ctx);
-        case STRUCT:
-            String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexPropertyKey());
-            AtlasEdge currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
-            Optional<AtlasEdge> edge = currentEdge != null ? Optional.of(currentEdge) : Optional.<AtlasEdge>absent();
-            ctx.setExistingEdge(edge);
-            AtlasEdge newEdge = toGraph(ctx);
-
-            if (currentEdge != null && !currentEdge.equals(newEdge)) {
-                deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), false, true);
-            }
-            return newEdge;
-        case ENTITY:
-            edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexPropertyKey());
-            currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
-            AtlasEntityType instanceType = entityVertexMapper.getInstanceType(ctx.getValue());
-            edge = currentEdge != null ? Optional.of(currentEdge) : Optional.<AtlasEdge>absent();
-            ctx.setElementType(instanceType);
-            ctx.setExistingEdge(edge);
-            newEdge = entityVertexMapper.toGraph(ctx);
-
-            if (currentEdge != null && !currentEdge.equals(newEdge)) {
-                deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), shouldManageChildReferences(ctx.getParentType(), ctx.getAttributeDef().getName()), true);
-            }
-            return newEdge;
-        case MAP:
-            return mapVertexMapper.toGraph(ctx);
-        case ARRAY:
-            return arrVertexMapper.toGraph(ctx);
-        default:
-            throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name());
-        }
-    }
-
-    protected Object primitivesToVertex(GraphMutationContext ctx) {
-        AtlasGraphUtilsV1.setProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey(), ctx.getValue());
-        return ctx.getValue();
-    }
-
-    private AtlasEdge createVertex(AtlasStructType parentType, AtlasStructType attrType, AtlasAttributeDef attributeDef, AtlasStruct struct, AtlasVertex referringVertex, String edgeLabel) throws AtlasBaseException {
-        AtlasVertex vertex = createVertexTemplate(struct, attrType);
-        mapAttributestoVertex(EntityMutations.EntityOperation.CREATE, attrType, struct, vertex);
-
-        try {
-            //TODO - Map directly in AtlasGraphUtilsV1
-            return graphHelper.getOrCreateEdge(referringVertex, vertex, edgeLabel);
-        } catch (RepositoryException e) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
-        }
-    }
-
-    private void updateVertex(AtlasStructType parentType, AtlasStructType structAttributeType, AtlasAttributeDef attributeDef, AtlasStruct value, AtlasVertex structVertex) throws AtlasBaseException {
-        mapAttributestoVertex(EntityMutations.EntityOperation.CREATE, structAttributeType, value, structVertex);
-    }
-
-    protected AtlasVertex createVertexTemplate(final AtlasStruct instance, final AtlasStructType structType) {
-        LOG.debug("Creating AtlasVertex for type {}", instance.getTypeName());
-        final AtlasVertex vertexWithoutIdentity = graph.addVertex();
-
-        // add type information
-        AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.ENTITY_TYPE_PROPERTY_KEY, instance.getTypeName());
-
-        // add state information
-        AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
-
-        // add timestamp information
-        AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
-        AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
-            RequestContextV1.get().getRequestTime());
-
-        AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
-
-        GraphHelper.setProperty(vertexWithoutIdentity, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
-
-        return vertexWithoutIdentity;
-    }
-
-    protected Object mapCollectionElementsToVertex(GraphMutationContext ctx) throws AtlasBaseException {
-        switch(ctx.getAttrType().getTypeCategory()) {
-        case PRIMITIVE:
-        case ENUM:
-            return ctx.getValue();
-        case STRUCT:
-            return toGraph(ctx);
-        case ENTITY:
-            AtlasEntityType instanceType = entityVertexMapper.getInstanceType(ctx.getValue());
-            ctx.setElementType(instanceType);
-            return entityVertexMapper.toGraph(ctx);
-        case MAP:
-        case ARRAY:
-        default:
-            throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name());
-        }
-    }
-}


Mime
View raw message