Return-Path: X-Original-To: apmail-atlas-commits-archive@minotaur.apache.org Delivered-To: apmail-atlas-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 47C6519900 for ; Sat, 16 Apr 2016 03:58:22 +0000 (UTC) Received: (qmail 20184 invoked by uid 500); 16 Apr 2016 03:58:22 -0000 Delivered-To: apmail-atlas-commits-archive@atlas.apache.org Received: (qmail 20163 invoked by uid 500); 16 Apr 2016 03:58:21 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 20154 invoked by uid 99); 16 Apr 2016 03:58:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Apr 2016 03:58:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 63606180184 for ; Sat, 16 Apr 2016 03:58:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id xf3ua6Ayn3hr for ; Sat, 16 Apr 2016 03:58:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 6AAC05F572 for ; Sat, 16 Apr 2016 03:58:11 +0000 (UTC) Received: (qmail 19967 invoked by uid 99); 16 Apr 2016 03:58:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Apr 2016 03:58:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A45ADFE04; Sat, 16 Apr 2016 03:58:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shwethags@apache.org To: commits@atlas.incubator.apache.org Date: Sat, 16 Apr 2016 03:58:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] incubator-atlas git commit: ATLAS-622 Introduce soft delete (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 7011fdf..a3dc7e5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -17,12 +17,13 @@ */ package org.apache.atlas.repository.graph; +import com.google.inject.Inject; import com.thinkaurelius.titan.core.SchemaViolationException; import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Vertex; - import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.typesystem.IReferenceableInstance; @@ -36,17 +37,13 @@ import org.apache.atlas.typesystem.persistence.ReferenceableInstance; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; -import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.atlas.typesystem.types.EnumValue; -import org.apache.atlas.typesystem.types.IConstructableType; import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.ObjectGraphWalker; -import org.apache.atlas.typesystem.types.StructType; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeUtils; -import org.apache.atlas.typesystem.types.TypeUtils.Pair; import org.apache.atlas.utils.MD5Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,70 +55,70 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.atlas.repository.graph.GraphHelper.string; + public final class TypedInstanceToGraphMapper { private static final Logger LOG = LoggerFactory.getLogger(TypedInstanceToGraphMapper.class); private final Map idToVertexMap = new HashMap<>(); - //Maintains a set of Guid based Ids that are referenced/created during graph walk - private final Set referencedIds = new HashSet<>(); private final TypeSystem typeSystem = TypeSystem.getInstance(); - private final List deletedEntityGuids = new ArrayList<>(); - private final List deletedEntities = new ArrayList<>(); - private final GraphToTypedInstanceMapper graphToTypedInstanceMapper; private static final GraphHelper graphHelper = GraphHelper.getInstance(); + private DeleteHandler deleteHandler; + private GraphToTypedInstanceMapper graphToTypedInstanceMapper; + + @Inject + public TypedInstanceToGraphMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper, DeleteHandler deleteHandler) { + this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; + this.deleteHandler = deleteHandler; + } + private final String SIGNATURE_HASH_PROPERTY_KEY = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "signature"; public enum Operation { CREATE, UPDATE_PARTIAL, - UPDATE_FULL, - DELETE + UPDATE_FULL } - public TypedInstanceToGraphMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper) { - this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; - } - - TypeUtils.Pair, List> mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) - throws AtlasException { - - List createdIds = new ArrayList<>(); - List updatedIds = new ArrayList<>(); + void mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) + throws AtlasException { + RequestContext requestContext = RequestContext.get(); for (ITypedReferenceableInstance typedInstance : typedInstances) { + LOG.debug("Adding/updating entity {}", typedInstance); Collection newInstances = walkClassInstances(typedInstance); TypeUtils.Pair, List> instancesPair = createVerticesAndDiscoverInstances(newInstances); + List entitiesToCreate = instancesPair.left; + List entitiesToUpdate = instancesPair.right; switch (operation) { - case CREATE: - List ids = addOrUpdateAttributesAndTraits(operation, instancesPair.left); - createdIds.addAll(ids); - addFullTextProperty(instancesPair.left); - break; - - case UPDATE_FULL: - case UPDATE_PARTIAL: - ids = addOrUpdateAttributesAndTraits(Operation.CREATE, instancesPair.left); - createdIds.addAll(ids); - ids = addOrUpdateAttributesAndTraits(operation, instancesPair.right); - updatedIds.addAll(ids); - - addFullTextProperty(instancesPair.left); - addFullTextProperty(instancesPair.right); - break; - - default: - throw new UnsupportedOperationException("Not handled - " + operation); + case CREATE: + List ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate); + addFullTextProperty(entitiesToCreate); + requestContext.recordCreatedEntities(ids); + break; + + case UPDATE_FULL: + case UPDATE_PARTIAL: + ids = addOrUpdateAttributesAndTraits(Operation.CREATE, entitiesToCreate); + requestContext.recordCreatedEntities(ids); + ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate); + requestContext.recordUpdatedEntities(ids); + + addFullTextProperty(entitiesToCreate); + addFullTextProperty(entitiesToUpdate); + break; + + default: + throw new UnsupportedOperationException("Not handled - " + operation); } } - return TypeUtils.Pair.of(createdIds, updatedIds); } private Collection walkClassInstances(ITypedReferenceableInstance typedInstance) @@ -129,7 +126,7 @@ public final class TypedInstanceToGraphMapper { EntityProcessor entityProcessor = new EntityProcessor(); try { - LOG.debug("Walking the object graph for instance {}", typedInstance.getTypeName()); + LOG.debug("Walking the object graph for instance {}", typedInstance.toShortString()); new ObjectGraphWalker(typeSystem, entityProcessor, typedInstance).walk(); } catch (AtlasException me) { throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me); @@ -155,7 +152,7 @@ public final class TypedInstanceToGraphMapper { private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance) throws AtlasException { - LOG.debug("Adding/Updating typed instance {}", typedInstance.getTypeName()); + LOG.debug("Adding/Updating typed instance {}", typedInstance.toShortString()); Id id = typedInstance.getId(); if (id == null) { // oops @@ -173,8 +170,6 @@ public final class TypedInstanceToGraphMapper { if (Operation.CREATE.equals(operation)) { //TODO - Handle Trait updates addTraits(typedInstance, instanceVertex, classType); - } else if (Operation.UPDATE_FULL.equals(operation) || Operation.UPDATE_PARTIAL.equals(operation)) { - GraphHelper.setProperty(instanceVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.valueOf(System.currentTimeMillis())); } return getId(typedInstance)._getId(); } @@ -182,78 +177,58 @@ public final class TypedInstanceToGraphMapper { void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex, Map fields, boolean mapOnlyUniqueAttributes, Operation operation) throws AtlasException { - - LOG.debug("Mapping instance {} of {} to vertex {}", typedInstance, typedInstance.getTypeName(), - instanceVertex); + + LOG.debug("Mapping instance {} to vertex {}", typedInstance.toShortString(), string(instanceVertex)); for (AttributeInfo attributeInfo : fields.values()) { if (mapOnlyUniqueAttributes && !attributeInfo.isUnique) { continue; } - mapAttributesToVertex(typedInstance, instanceVertex, attributeInfo, operation); + mapAttributeToVertex(typedInstance, instanceVertex, attributeInfo, operation); } - - if (operation == Operation.DELETE) { - // Remove uni-directional references to the deletion candidate. - removeUnidirectionalReferences(instanceVertex); - - // Remove vertex for deletion candidate. - graphHelper.removeVertex(instanceVertex); - } - + GraphHelper.setProperty(instanceVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, + RequestContext.get().getRequestTime()); } - private String getInstanceName(Vertex referencingVertex, IConstructableType referencingType) { - - if (referencingType.getTypeCategory() == TypeCategory.CLASS) { - Id idFromVertex = GraphHelper.getIdFromVertex(referencingType.getName(), referencingVertex); - String instanceId = referencingType.getName() + ":" + idFromVertex._getId(); - return instanceId; - } - else { - return referencingType.getName(); - } - } - - void mapAttributesToVertex(ITypedInstance typedInstance, Vertex instanceVertex, - AttributeInfo attributeInfo, Operation operation) throws AtlasException { + void mapAttributeToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + AttributeInfo attributeInfo, Operation operation) throws AtlasException { Object attrValue = typedInstance.get(attributeInfo.name); - LOG.debug("mapping attribute {} = {}", attributeInfo.name, attrValue); + LOG.debug("Mapping attribute {} = {}", attributeInfo.name, attrValue); - if (attrValue != null || operation == Operation.UPDATE_FULL || operation == Operation.DELETE) { + if (attrValue != null || operation == Operation.UPDATE_FULL) { switch (attributeInfo.dataType().getTypeCategory()) { - case PRIMITIVE: - case ENUM: - if (operation != Operation.DELETE) { - mapPrimitiveOrEnumToVertex(typedInstance, instanceVertex, attributeInfo); - } - break; - - case ARRAY: - mapArrayCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); - break; - - case MAP: - mapMapCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); - break; - - case STRUCT: - case CLASS: - final String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - String edgeLabel = GraphHelper.getEdgeLabel(typedInstance, attributeInfo); - Iterator outGoingEdgesIterator = - GraphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel).iterator(); - String currentEntry = - outGoingEdgesIterator.hasNext() ? outGoingEdgesIterator.next().getId().toString() : null; - addOrUpdateCollectionEntry(instanceVertex, attributeInfo, attributeInfo.dataType(), attrValue, - currentEntry, propertyName, operation); - break; - - case TRAIT: - // do NOTHING - this is taken care of earlier - break; - - default: - throw new IllegalArgumentException("Unknown type category: " + attributeInfo.dataType().getTypeCategory()); + case PRIMITIVE: + case ENUM: + mapPrimitiveOrEnumToVertex(typedInstance, instanceVertex, attributeInfo); + break; + + case ARRAY: + mapArrayCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); + break; + + case MAP: + mapMapCollectionToVertex(typedInstance, instanceVertex, attributeInfo, operation); + break; + + case STRUCT: + case CLASS: + String edgeLabel = GraphHelper.getEdgeLabel(typedInstance, attributeInfo); + + Edge currentEdge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel); + String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(), + attrValue, currentEdge, edgeLabel, operation); + + if (currentEdge != null && !currentEdge.getId().toString().equals(newEdgeId)) { + deleteHandler.deleteReference(currentEdge, attributeInfo.dataType().getTypeCategory(), + attributeInfo.isComposite); + } + break; + + case TRAIT: + // do NOTHING - this is taken care of earlier + break; + + default: + throw new IllegalArgumentException("Unknown type category: " + attributeInfo.dataType().getTypeCategory()); } } } @@ -265,18 +240,18 @@ public final class TypedInstanceToGraphMapper { List instancesToUpdate = new ArrayList<>(); for (IReferenceableInstance instance : instances) { - LOG.debug("Discovering instance to create/update for {}", instance); + LOG.debug("Discovering instance to create/update for {}", instance.toShortString()); ITypedReferenceableInstance newInstance; Id id = instance.getId(); if (!idToVertexMap.containsKey(id)) { Vertex instanceVertex; if (id.isAssigned()) { // has a GUID - LOG.debug("Instance {} has an assigned id", instance.getId()._getId()); + LOG.debug("Instance has an assigned id {}", instance.getId()._getId()); instanceVertex = graphHelper.getVertexForGUID(id.id); if (!(instance instanceof ReferenceableInstance)) { throw new IllegalStateException( - String.format("%s is not of type ITypedReferenceableInstance", instance)); + String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); } newInstance = (ITypedReferenceableInstance) instance; instancesToUpdate.add(newInstance); @@ -288,7 +263,7 @@ public final class TypedInstanceToGraphMapper { //no entity with the given unique attribute, create new if (instanceVertex == null) { - LOG.debug("Creating new vertex for instance {}", instance); + LOG.debug("Creating new vertex for instance {}", instance.toShortString()); newInstance = classType.convert(instance, Multiplicity.REQUIRED); instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); instancesToCreate.add(newInstance); @@ -297,21 +272,18 @@ public final class TypedInstanceToGraphMapper { mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); } else { - LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(), instance); + LOG.debug("Re-using existing vertex {} for instance {}", string(instanceVertex), instance.toShortString()); if (!(instance instanceof ReferenceableInstance)) { throw new IllegalStateException( - String.format("%s is not of type ITypedReferenceableInstance", instance)); + String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); } newInstance = (ITypedReferenceableInstance) instance; instancesToUpdate.add(newInstance); } - } //Set the id in the new instance idToVertexMap.put(id, instanceVertex); - referencedIds.add(GraphHelper.getIdFromVertex(instance.getTypeName(), instanceVertex)); - } } return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate); @@ -338,165 +310,158 @@ public final class TypedInstanceToGraphMapper { } } - /******************************************** STRUCT **************************************************/ + /******************************************** ARRAY **************************************************/ - private TypeUtils.Pair updateStructVertex(ITypedStruct structInstance, Edge relEdge, - Operation operation) throws AtlasException { - //Already existing vertex. Update - Vertex structInstanceVertex = relEdge.getVertex(Direction.IN); + private void mapArrayCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + AttributeInfo attributeInfo, Operation operation) throws AtlasException { + LOG.debug("Mapping instance {} for array attribute {} vertex {}", typedInstance.toShortString(), + attributeInfo.name, string(instanceVertex)); - // Update attributes - final MessageDigest digester = MD5Utils.getDigester(); - String newSignature = structInstance.getSignatureHash(digester); - String curSignature = structInstanceVertex.getProperty(SIGNATURE_HASH_PROPERTY_KEY); + List newElements = (List) typedInstance.get(attributeInfo.name); + boolean newAttributeEmpty = (newElements == null || newElements.isEmpty()); - if (!newSignature.equals(curSignature)) { - //Update struct vertex instance only if there is a change - LOG.debug("Updating struct {} since signature has changed {} {} ", structInstance, curSignature, newSignature); - mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, operation); - GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature)); + if (newAttributeEmpty && operation != Operation.UPDATE_FULL) { + return; } - return TypeUtils.Pair.of(structInstanceVertex, relEdge); - } - private TypeUtils.Pair addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, - AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { - // add a new vertex for the struct or trait instance - Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, - Collections.emptySet()); // no super types for struct type - LOG.debug("created vertex {} for struct {} value {}", structInstanceVertex, attributeInfo.name, structInstance); - - // map all the attributes to this new vertex - mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, Operation.CREATE); - // add an edge to the newly created vertex from the parent - Edge relEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel); - - return TypeUtils.Pair.of(structInstanceVertex, relEdge); - } - - /******************************************** ARRAY **************************************************/ - - private void mapArrayCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, - AttributeInfo attributeInfo, Operation operation) throws AtlasException { - LOG.debug("Mapping instance {} to vertex {} for name {}", typedInstance.getTypeName(), instanceVertex, - attributeInfo.name); - List newElements = (List) typedInstance.get(attributeInfo.name); - boolean empty = (newElements == null || newElements.isEmpty()); - if (!empty || operation == Operation.UPDATE_FULL || operation == Operation.DELETE) { - String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - List currentEntries = instanceVertex.getProperty(propertyName); + String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); + List currentElements = instanceVertex.getProperty(propertyName); + IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); + List newElementsCreated = new ArrayList<>(); - IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); - List newEntries = new ArrayList<>(); + if (!newAttributeEmpty) { if (newElements != null && !newElements.isEmpty()) { int index = 0; for (; index < newElements.size(); index++) { - String currentEntry = - (currentEntries != null && index < currentEntries.size()) ? currentEntries.get(index) : null; + String currentElement = (currentElements != null && index < currentElements.size()) ? + currentElements.get(index) : null; + LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index, + currentElement, newElements.get(index)); String newEntry = addOrUpdateCollectionEntry(instanceVertex, attributeInfo, elementType, - newElements.get(index), currentEntry, propertyName, operation); - newEntries.add(newEntry); + newElements.get(index), currentElement, propertyName, operation); + newElementsCreated.add(newEntry); } + } + } - //Remove extra entries in the list - if (currentEntries != null) { - if (index < currentEntries.size()) { - for (; index < currentEntries.size(); index++) { - if (elementType.getTypeCategory() == TypeCategory.CLASS) { - final String edgeId = currentEntries.get(index); - final Pair edgeAndTargetVertex = GraphHelper.getInstance().getEdgeAndTargetVertex(edgeId); - Id guid = GraphHelper.getIdFromVertex(elementType.getName(), edgeAndTargetVertex.right); - removeUnusedClassReference(edgeId, attributeInfo, elementType, !referencedIds.contains(guid)); - } else if (elementType.getTypeCategory() == TypeCategory.STRUCT) { - removeUnusedStructReference(currentEntries.get(index), attributeInfo, elementType); - } - } - } - } + // for dereference on way out + GraphHelper.setProperty(instanceVertex, propertyName, newElementsCreated); + + removeUnusedEntries(currentElements, newElementsCreated, elementType, attributeInfo); + } + + private void removeUnusedEntries(List currentEntries, List newEntries, IDataType entryType, + AttributeInfo attributeInfo) throws AtlasException { + if (currentEntries == null || currentEntries.isEmpty()) { + return; + } + + LOG.debug("Removing unused entries from the old collection"); + if (entryType.getTypeCategory() == DataTypes.TypeCategory.STRUCT + || entryType.getTypeCategory() == DataTypes.TypeCategory.CLASS) { + + //Get map of edge id to edge + Map edgeMap = new HashMap<>(); + getEdges(currentEntries, edgeMap); + getEdges(newEntries, edgeMap); + + //Get final set of in vertices + Set newInVertices = new HashSet<>(); + for (String edgeId : newEntries) { + Vertex inVertex = edgeMap.get(edgeId).getVertex(Direction.IN); + newInVertices.add(inVertex.getId().toString()); } - else if (operation == Operation.UPDATE_FULL || operation == Operation.DELETE) { - // Clear all existing entries - if (currentEntries != null) { - for (String entry : currentEntries) { - if (elementType.getTypeCategory() == TypeCategory.CLASS) { - removeUnusedClassReference(entry, attributeInfo, elementType, true); - } else if(elementType.getTypeCategory() == TypeCategory.STRUCT) { - removeUnusedStructReference(entry, attributeInfo, elementType); - } + + //Remove the edges for (current edges - new edges) + List cloneElements = new ArrayList<>(currentEntries); + cloneElements.removeAll(newEntries); + LOG.debug("Removing unused entries from the old collection - {}", cloneElements); + + if (!cloneElements.isEmpty()) { + for (String edgeIdForDelete : cloneElements) { + Edge edge = edgeMap.get(edgeIdForDelete); + Vertex inVertex = edge.getVertex(Direction.IN); + if (newInVertices.contains(inVertex.getId().toString())) { + //If the edge.inVertex is in the new set of in vertices, just delete the edge + deleteHandler.deleteEdge(edge, true); + } else { + //else delete the edge + vertex + deleteHandler.deleteReference(edge, entryType.getTypeCategory(), attributeInfo.isComposite); } } } + } + } - // for dereference on way out - GraphHelper.setProperty(instanceVertex, propertyName, newEntries); + private void getEdges(List edgeIds, Map edgeMap) { + if (edgeIds == null) { + return; + } + + for (String edgeId : edgeIds) { + if (!edgeMap.containsKey(edgeId)) { + edgeMap.put(edgeId, graphHelper.getEdgeById(edgeId)); + } } } + /******************************************** MAP **************************************************/ private void mapMapCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, - AttributeInfo attributeInfo, Operation operation) throws AtlasException { - LOG.debug("Mapping instance {} to vertex {} for name {}", typedInstance.getTypeName(), instanceVertex, + AttributeInfo attributeInfo, Operation operation) throws AtlasException { + LOG.debug("Mapping instance {} to vertex {} for attribute {}", typedInstance.toShortString(), string(instanceVertex), attributeInfo.name); - @SuppressWarnings("unchecked") Map collection = - (Map) typedInstance.get(attributeInfo.name); - boolean empty = (collection == null || collection.isEmpty()); - if (!empty || operation == Operation.UPDATE_FULL || operation == Operation.DELETE) { + @SuppressWarnings("unchecked") Map newAttribute = + (Map) typedInstance.get(attributeInfo.name); - String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType(); + boolean newAttributeEmpty = (newAttribute == null || newAttribute.isEmpty()); + if (newAttributeEmpty && operation != Operation.UPDATE_FULL) { + return; + } - if (!empty) { - for (Map.Entry entry : collection.entrySet()) { - String myPropertyName = propertyName + "." + entry.getKey().toString(); + IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType(); + String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); + List currentElements = new ArrayList<>(); + List newElementsCreated = new ArrayList<>(); + List newKeysCreated = new ArrayList<>(); - String currentEntry = instanceVertex.getProperty(myPropertyName); - String newEntry = addOrUpdateCollectionEntry(instanceVertex, attributeInfo, elementType, - entry.getValue(), currentEntry, myPropertyName, operation); + if (!newAttributeEmpty) { + for (Map.Entry entry : newAttribute.entrySet()) { + String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, entry.getKey().toString()); + newKeysCreated.add(entry.getKey().toString()); - //Add/Update/Remove property value - GraphHelper.setProperty(instanceVertex, myPropertyName, newEntry); - } - } - - //Remove unused key references - List origKeys = instanceVertex.getProperty(propertyName); - if (origKeys != null) { - if (collection != null) { - origKeys.removeAll(collection.keySet()); - } - for (Object unusedKey : origKeys) { - String edgeLabel = GraphHelper.getEdgeLabel(typedInstance, attributeInfo) + "." + unusedKey; - if (instanceVertex.getEdges(Direction.OUT, edgeLabel).iterator().hasNext()) { - Edge edge = instanceVertex.getEdges(Direction.OUT, edgeLabel).iterator().next(); - if (TypeCategory.STRUCT.equals(((DataTypes.MapType) attributeInfo.dataType()).getValueType().getTypeCategory())) { - removeUnusedStructReference(edge.getId().toString(), attributeInfo, - ((DataTypes.MapType) attributeInfo.dataType()).getValueType()); - } else if(TypeCategory.CLASS.equals(((DataTypes.MapType) attributeInfo.dataType()).getValueType().getTypeCategory())){ - final Vertex targetVertex = edge.getVertex(Direction.OUT); - Id guid = GraphHelper.getIdFromVertex(elementType.getName(), targetVertex); - removeUnusedClassReference(edge.getId().toString(), attributeInfo, elementType, !referencedIds.contains(guid)); } - } + String currentEntry = instanceVertex.getProperty(propertyNameForKey); + if (currentEntry != null) { + currentElements.add(currentEntry); } - } - // for dereference on way out - GraphHelper.setProperty(instanceVertex, propertyName, collection == null ? null : new ArrayList(collection.keySet())); + String newEntry = addOrUpdateCollectionEntry(instanceVertex, attributeInfo, elementType, + entry.getValue(), currentEntry, propertyNameForKey, operation); + + //Add/Update/Remove property value + GraphHelper.setProperty(instanceVertex, propertyNameForKey, newEntry); + newElementsCreated.add(newEntry); + } } + + // for dereference on way out + GraphHelper.setProperty(instanceVertex, propertyName, newKeysCreated); + + removeUnusedEntries(currentElements, newElementsCreated, elementType, attributeInfo); } /******************************************** ARRAY & MAP **************************************************/ private String addOrUpdateCollectionEntry(Vertex instanceVertex, AttributeInfo attributeInfo, - IDataType elementType, Object newVal, String curVal, String propertyName, - Operation operation) - throws AtlasException { + IDataType elementType, Object newAttributeValue, String currentValue, + String propertyName, Operation operation) + throws AtlasException { - final String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; switch (elementType.getTypeCategory()) { case PRIMITIVE: case ENUM: - return newVal != null ? newVal.toString() : null; + return newAttributeValue != null ? newAttributeValue.toString() : null; case ARRAY: case MAP: @@ -505,67 +470,113 @@ public final class TypedInstanceToGraphMapper { return null; case STRUCT: - return addOrUpdateStruct(instanceVertex, attributeInfo, elementType, (ITypedStruct) newVal, curVal, edgeLabel, operation); - case CLASS: - return addOrUpdateClassVertex(instanceVertex, attributeInfo, elementType, - (ITypedReferenceableInstance) newVal, curVal, edgeLabel, operation); + final String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; + Edge currentEdge = graphHelper.getEdgeById(currentValue); + return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, currentEdge, + edgeLabel, operation); default: throw new IllegalArgumentException("Unknown type category: " + elementType.getTypeCategory()); } } - private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType, - ITypedStruct structAttr, String curVal, + private String addOrUpdateReference(Vertex instanceVertex, AttributeInfo attributeInfo, + IDataType attributeType, Object newAttributeValue, Edge currentEdge, + String edgeLabel, Operation operation) throws AtlasException { + switch (attributeType.getTypeCategory()) { + case STRUCT: + return addOrUpdateStruct(instanceVertex, attributeInfo, (ITypedStruct) newAttributeValue, currentEdge, + edgeLabel, operation); + + case CLASS: + return addOrUpdateClassVertex(instanceVertex, currentEdge, + (ITypedReferenceableInstance) newAttributeValue, attributeInfo, edgeLabel); + + default: + throw new IllegalArgumentException("Unknown type category: " + attributeType.getTypeCategory()); + } + } + /******************************************** STRUCT **************************************************/ + + private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, + ITypedStruct newAttributeValue, Edge currentEdge, String edgeLabel, Operation operation) throws AtlasException { - TypeUtils.Pair vertexEdgePair = null; - if (curVal != null && structAttr == null) { - //remove edge - removeUnusedStructReference(curVal, attributeInfo, elementType); - } else if (curVal != null && structAttr != null) { + String newEdgeId = null; + if (currentEdge != null && newAttributeValue != null) { //update - Edge edge = graphHelper.getOutGoingEdgeById(curVal); - vertexEdgePair = updateStructVertex(structAttr, edge, operation); - } else if (structAttr != null) { + updateStructVertex(newAttributeValue, currentEdge, operation); + newEdgeId = currentEdge.getId().toString(); + } else if (currentEdge == null && newAttributeValue != null) { //add - vertexEdgePair = addStructVertex(structAttr, instanceVertex, attributeInfo, edgeLabel); + Edge newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel); + newEdgeId = newEdge.getId().toString(); } + return newEdgeId; + } + + private Edge addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, + AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { + // add a new vertex for the struct or trait instance + Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, + Collections.emptySet()); // no super types for struct type + LOG.debug("created vertex {} for struct {} value {}", string(structInstanceVertex), attributeInfo.name, + structInstance.toShortString()); - return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null; + // map all the attributes to this new vertex + mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, + Operation.CREATE); + // add an edge to the newly created vertex from the parent + Edge newEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel); + + return newEdge; } - private String addOrUpdateClassVertex(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType, - ITypedReferenceableInstance newVal, String curVal, - String edgeLabel, Operation operation) throws AtlasException { - Vertex toVertex = getClassVertex(newVal); - if(toVertex == null && newVal != null) { - LOG.error("Could not find vertex for Class Reference " + newVal); - throw new EntityNotFoundException("Could not find vertex for Class Reference " + newVal); - } + private void updateStructVertex(ITypedStruct newAttributeValue, Edge currentEdge, + Operation operation) throws AtlasException { + //Already existing vertex. Update + Vertex structInstanceVertex = currentEdge.getVertex(Direction.IN); - TypeUtils.Pair vertexEdgePair = null; - if (curVal != null && newVal == null) { - //remove edge - removeUnusedClassReference(curVal, attributeInfo, elementType, true); - } else if (curVal != null && newVal != null) { - Edge edge = graphHelper.getOutGoingEdgeById(curVal); - Id classRefId = getId(newVal); - vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, - elementType, edgeLabel, operation); - } else if (newVal != null){ - vertexEdgePair = addClassEdge(instanceVertex, toVertex, edgeLabel); - } + LOG.debug("Updating struct vertex {} with struct {}", string(structInstanceVertex), newAttributeValue.toShortString()); + + // Update attributes + final MessageDigest digester = MD5Utils.getDigester(); + String newSignature = newAttributeValue.getSignatureHash(digester); + String curSignature = structInstanceVertex.getProperty(SIGNATURE_HASH_PROPERTY_KEY); - return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null; + if (!newSignature.equals(curSignature)) { + //Update struct vertex instance only if there is a change + LOG.debug("Updating struct {} since signature has changed {} {} ", newAttributeValue, curSignature, newSignature); + mapInstanceToVertex(newAttributeValue, structInstanceVertex, newAttributeValue.fieldMapping().fields, false, operation); + GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature)); + } } /******************************************** CLASS **************************************************/ - private TypeUtils.Pair addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { - // add an edge to the class vertex from the instance - Edge edge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel); - return TypeUtils.Pair.of(toVertex, edge); + private String addOrUpdateClassVertex(Vertex instanceVertex, Edge currentEdge, + ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo, + String edgeLabel) throws AtlasException { + Vertex newReferenceVertex = getClassVertex(newAttributeValue); + if(newReferenceVertex == null && newAttributeValue != null) { + LOG.error("Could not find vertex for Class Reference " + newAttributeValue); + throw new EntityNotFoundException("Could not find vertex for Class Reference " + newAttributeValue); + } + + String newEdgeId = null; + if (currentEdge != null && newAttributeValue != null) { + newEdgeId = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex, + attributeInfo, edgeLabel); + } else if (currentEdge == null && newAttributeValue != null){ + Edge newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel); + newEdgeId = newEdge.getId().toString(); + } + return newEdgeId; + } + + private Edge addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { + // add an edge to the class vertex from the instance + return graphHelper.addEdge(instanceVertex, toVertex, edgeLabel); } private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { @@ -598,50 +609,36 @@ public final class TypedInstanceToGraphMapper { } - private TypeUtils.Pair updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance, - Vertex instanceVertex, Edge edge, Vertex toVertex, - AttributeInfo attributeInfo, IDataType dataType, - String edgeLabel, Operation operation) throws AtlasException { - TypeUtils.Pair result = TypeUtils.Pair.of(toVertex, edge); - Edge newEdge = edge; + private String updateClassEdge(Vertex instanceVertex, Edge currentEdge, + ITypedReferenceableInstance newAttributeValue, + Vertex newVertex, AttributeInfo attributeInfo, + String edgeLabel) throws AtlasException { + LOG.debug("Updating {} for reference attribute {}", string(currentEdge), attributeInfo.name); // Update edge if it exists - Vertex invertex = edge.getVertex(Direction.IN); - String currentGUID = invertex.getProperty(Constants.GUID_PROPERTY_KEY); - Id currentId = new Id(currentGUID, 0, (String) invertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY)); - if (!currentId.equals(id)) { + Vertex currentVertex = currentEdge.getVertex(Direction.IN); + String currentEntityId = GraphHelper.getIdFromVertex(currentVertex); + String newEntityId = getId(newAttributeValue).id; + String newEdgeId = currentEdge.getId().toString(); + if (!currentEntityId.equals(newEntityId)) { // add an edge to the class vertex from the instance - if (toVertex != null) { - newEdge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel); - result = TypeUtils.Pair.of(toVertex, newEdge); - } - - //Delete vertex only if the IdtoVertex map doesnt have it in future references - removeUnusedClassReference(edge.getId().toString(), attributeInfo, dataType, !referencedIds.contains(currentId)); - } - - - if (attributeInfo.isComposite) { - //Update the attributes also if composite - if (typedInstance.fieldMapping() != null) { - //In case of Id instance, fieldMapping is null - mapInstanceToVertex(typedInstance, toVertex, typedInstance.fieldMapping().fields , false, operation); - //Update full text for the updated composite vertex - addFullTextProperty(new ArrayList() {{ add(typedInstance); }}); + if (newVertex != null) { + Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel); + newEdgeId = newEdge.getId().toString(); } } - return result; + return newEdgeId; } /******************************************** TRAITS ****************************************************/ void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, Vertex parentInstanceVertex) - throws AtlasException { + throws AtlasException { // add a new vertex for the struct or trait instance final String traitName = traitInstance.getTypeName(); Vertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null, typeSystem.getDataType(TraitType.class, traitName).getAllSuperTypeNames()); - LOG.debug("created vertex {} for trait {}", traitInstanceVertex, traitName); + LOG.debug("created vertex {} for trait {}", string(traitInstanceVertex), traitName); // map all the attributes to this newly created vertex mapInstanceToVertex(traitInstance, traitInstanceVertex, traitInstance.fieldMapping().fields, false, Operation.CREATE); @@ -692,314 +689,6 @@ public final class TypedInstanceToGraphMapper { } } - GraphHelper.setProperty(instanceVertex, vertexPropertyName, propertyValue); } - - private Edge removeUnusedClassReference(String edgeId, AttributeInfo attributeInfo, IDataType elementType, boolean deleteReferredVertex) throws AtlasException { - // Remove edge to disconnect struct or class reference. - // For struct or composite class reference, also delete the target instance. - Edge removedRelation = null; - TypeUtils.Pair edgeAndVertex = graphHelper.getEdgeAndTargetVertex(edgeId); - if (attributeInfo.isComposite) { - // For uni-directional reference, remove the edge. - // For bi-directional reference, the edges are removed - // when the composite entity is deleted. - if (attributeInfo.reverseAttributeName == null) { - graphHelper.removeEdge(edgeAndVertex.left); - removedRelation = edgeAndVertex.left; - } - - // Delete the contained entity. - if (deleteReferredVertex) { - if (LOG.isDebugEnabled()) { - Vertex sourceVertex = edgeAndVertex.left.getVertex(Direction.OUT); - String sourceTypeName = GraphHelper.getTypeName(sourceVertex); - LOG.debug("Deleting composite entity {}:{} contained by {}:{} through reference {}", - elementType.getName(), GraphHelper.getIdFromVertex(elementType.getName(), edgeAndVertex.right)._getId(), - sourceTypeName, GraphHelper.getIdFromVertex(sourceTypeName, sourceVertex)._getId(), - attributeInfo.name); - } - deleteEntity(elementType.getName(), edgeAndVertex.right); - } - } - else { - if (attributeInfo.reverseAttributeName != null) { - // Disconnect both ends of the bi-directional reference - removeReverseReference(edgeAndVertex, attributeInfo); - } - graphHelper.removeEdge(edgeAndVertex.left); - removedRelation = edgeAndVertex.left; - - return removedRelation; - } - return removedRelation; - - } - - private Edge removeUnusedStructReference(String edgeId, AttributeInfo attributeInfo, IDataType elementType) throws AtlasException { - // Remove edge to disconnect struct or class reference. - // For struct or composite class reference, also delete the target instance. - Edge removedRelation = null; - TypeUtils.Pair edgeAndVertex = graphHelper.getEdgeAndTargetVertex(edgeId); - graphHelper.removeEdge(edgeAndVertex.left); - removedRelation = edgeAndVertex.left; - - // Create an empty instance to use for clearing all struct attributes. - StructType structType = (StructType) elementType; - ITypedStruct typedInstance = structType.createInstance(); - - // Delete target vertex and any underlying structs and composite entities owned by this struct. - mapInstanceToVertex(typedInstance, edgeAndVertex.right, structType.fieldMapping().fields, false, Operation.DELETE); - return removedRelation; - } - - /** - * Remove the reverse reference value for the specified edge and vertex. - * - * @param edgeAndVertex - * @param attributeInfo - * @throws AtlasException - */ - private void removeReverseReference(TypeUtils.Pair edgeAndVertex, - AttributeInfo attributeInfo) throws AtlasException { - - Vertex sourceVertex = edgeAndVertex.left.getVertex(Direction.OUT); - String inverseTypeName = GraphHelper.getTypeName(edgeAndVertex.right); - IConstructableType inverseType = typeSystem.getDataType(IConstructableType.class, inverseTypeName); - AttributeInfo inverseAttributeInfo = inverseType.fieldMapping().fields.get(attributeInfo.reverseAttributeName); - String inverseEdgeLabel = GraphHelper.getEdgeLabel(inverseType, inverseAttributeInfo); - TypeCategory inverseTypeCategory = inverseAttributeInfo.dataType().getTypeCategory(); - - // Find and remove the edge which represents the inverse reference value. - Iterable inverseEdges = GraphHelper.getOutGoingEdgesByLabel(edgeAndVertex.right, inverseEdgeLabel); - Edge removedEdge = null; - // Search for the edge which references the source vertex. - for (Edge edge : inverseEdges) { - Vertex vertex = edge.getVertex(Direction.IN); - if (vertex.equals(sourceVertex)) { - // Found the edge which points back at source vertex. - // Disconnect the reference by removing the edge and - // removing the edge ID from the vertex property. - removeReferenceValue(edge, new AtlasEdgeLabel(edge.getLabel()), edgeAndVertex.right, inverseType, inverseTypeCategory); - removedEdge = edge; - break; - } - } - if (removedEdge != null) { - if (LOG.isDebugEnabled()) { - String sourceTypeName = GraphHelper.getTypeName(sourceVertex); - LOG.debug("Removed edge {} for reverse reference {} from {}:{} to {}:{} ", removedEdge, - GraphHelper.getQualifiedFieldName(inverseType, inverseAttributeInfo.name), - inverseTypeName, GraphHelper.getIdFromVertex(inverseTypeName, edgeAndVertex.right)._getId(), - sourceTypeName, GraphHelper.getIdFromVertex(sourceTypeName, sourceVertex)._getId()); - } - } - else { - // We didn't find the edge for the inverse reference. - // Since Atlas currently does not automatically set - // the inverse reference when a reference value is updated, - // unbalanced references are not unexpected. - // The presence of inverse reference values depends on - // well behaved client applications which explicitly set - // both ends of the reference. - // TODO: throw an exception as it indicates a unbalanced reference? - String sourceTypeName = GraphHelper.getTypeName(sourceVertex); - LOG.warn("No edge found for inverse reference {} on vertex {} for entity instance {}:{} which points back to vertex {} for {}:{}", - inverseAttributeInfo.name, edgeAndVertex.right, - inverseTypeName, GraphHelper.getIdFromVertex(inverseTypeName, edgeAndVertex.right)._getId(), - sourceVertex, sourceTypeName, GraphHelper.getIdFromVertex(sourceTypeName, sourceVertex)._getId()); - } - } - - /** - * Remove any unidirectional map or array reference to a class, struct, or trait vertex. - * This involves removing appropriate value from the vertex property which holds the - * reference values. - * - * @param targetVertex a vertex which represents a class, struct, or trait instance - * @throws AtlasException - */ - private void removeUnidirectionalReferences(Vertex targetVertex) throws AtlasException { - - // Search for any remaining incoming edges that represent unidirectional references - // to the target vertex. - Iterable incomingEdges = targetVertex.getEdges(Direction.IN); - for (Edge edge : incomingEdges) { - String label = edge.getLabel(); - AtlasEdgeLabel atlasEdgeLabel = new AtlasEdgeLabel(label); - Vertex referencingVertex = edge.getVertex(Direction.OUT); - String typeName = atlasEdgeLabel.getTypeName(); - IConstructableType referencingType = typeSystem.getDataType(IConstructableType.class, typeName); - - AttributeInfo attributeInfo = referencingType.fieldMapping().fields.get(atlasEdgeLabel.getAttributeName()); - if (attributeInfo == null) { - String instanceId = getInstanceName(referencingVertex, referencingType); - throw new AtlasException("Outgoing edge " + edge.getId().toString() - + " for " + instanceId + "(vertex " + referencingVertex + "): label " + label - + " has an attribute name " + atlasEdgeLabel.getAttributeName() + " that is undefined on " - + referencingType.getTypeCategory() + " " + typeName); - } - // Remove the appropriate value from the vertex property for this reference. - removeReferenceValue(edge, atlasEdgeLabel, referencingVertex, referencingType, attributeInfo.dataType().getTypeCategory()); - } - } - - private Pair removeReferenceValue(Edge edge, AtlasEdgeLabel atlasEdgeLabel, - Vertex referencingVertex, IConstructableType referencingType, TypeCategory attrTypeCategory) - throws AtlasException { - - graphHelper.removeEdge(edge); - if (attrTypeCategory != TypeCategory.ARRAY && attrTypeCategory != TypeCategory.MAP) { - // Multiplicity-one reference is represented by the edge, - // there is no vertex property to update. So just remove the edge. - return new Pair(edge.getId().toString(), Boolean.TRUE); - } - List currentRefValues = referencingVertex.getProperty(atlasEdgeLabel.getQualifiedAttributeName()); - List newRefValues = new ArrayList<>(currentRefValues); - Pair refValueRemoved = null; - if (attrTypeCategory == TypeCategory.ARRAY) { - refValueRemoved = removeArrayReferenceValue(atlasEdgeLabel, referencingVertex, edge, newRefValues); - } - else { - refValueRemoved = removeMapReferenceValue(atlasEdgeLabel, referencingVertex, edge, newRefValues); - } - if (refValueRemoved.right) { - if (LOG.isDebugEnabled()) { - String instanceId = getInstanceName(referencingVertex, referencingType); - LOG.debug("Reference value {} removed from reference {} on vertex {} for instance of {} {}", - refValueRemoved.left, atlasEdgeLabel.getAttributeName(), referencingVertex, - referencingType.getTypeCategory(), instanceId); - } - // If the referencing instance is an entity, update the modification timestamp. - if (referencingType instanceof ClassType) { - GraphHelper.setProperty(referencingVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, System.currentTimeMillis()); - } - } - else { - // The expected value is missing from the reference property values - log a warning. - String instanceId = getInstanceName(referencingVertex, referencingType); - LOG.warn("Reference value {} expected but not found in array reference {} on vertex {} for instance of {} {}", - refValueRemoved.left, atlasEdgeLabel.getAttributeName(), referencingVertex, - referencingType.getTypeCategory(), instanceId); - } - return refValueRemoved; - } - - private TypeUtils.Pair removeArrayReferenceValue(AtlasEdgeLabel atlasEdgeLabel, Vertex referencingVertex, - Edge edge, List newRefValues) { - - String refValueToRemove = edge.getId().toString(); - boolean valueRemoved = newRefValues.remove(refValueToRemove); - if (valueRemoved) { - GraphHelper.setProperty(referencingVertex, atlasEdgeLabel.getQualifiedAttributeName(), newRefValues); - } - return new TypeUtils.Pair(refValueToRemove, Boolean.valueOf(valueRemoved)); - } - - private TypeUtils.Pair removeMapReferenceValue(AtlasEdgeLabel atlasEdgeLabel, Vertex referencingVertex, - Edge edge, List newRefValues) throws AtlasException { - - String refValueToRemove = atlasEdgeLabel.getMapKey(); - if (refValueToRemove == null) { - // Edge label is missing the map key - throw an exception. - String typeName = atlasEdgeLabel.getTypeName(); - throw new AtlasException("Outgoing edge " + edge.getId().toString() - + " for vertex " + referencingVertex + "): label " + atlasEdgeLabel.getEdgeLabel() - + " for map attribute " + atlasEdgeLabel.getAttributeName() + " on type " - + typeName + " is missing the map key"); - } - boolean valueRemoved = newRefValues.remove(refValueToRemove); - if (valueRemoved) { - GraphHelper.setProperty(referencingVertex, atlasEdgeLabel.getQualifiedAttributeName(), newRefValues); - // For maps, also remove the key-value pair property value. - GraphHelper.setProperty(referencingVertex, atlasEdgeLabel.getQualifiedMapKey(), null); - } - return new TypeUtils.Pair(refValueToRemove, Boolean.valueOf(valueRemoved)); - } - - void deleteEntity(String typeName, Vertex instanceVertex) throws AtlasException { - // Check if this entity has already been processed. - Id id = GraphHelper.getIdFromVertex(typeName, instanceVertex); - if (deletedEntityGuids.contains(id._getId())) { - return; - } - deletedEntityGuids.add(id._getId()); - - // Remove traits owned by this entity. - deleteAllTraits(instanceVertex); - - // Create an empty instance to use for clearing all attributes. - ClassType classType = typeSystem.getDataType(ClassType.class, typeName); - ITypedReferenceableInstance typedInstance = classType.createInstance(id); - - // Remove any underlying structs and composite entities owned by this entity. - mapInstanceToVertex(typedInstance, instanceVertex, classType.fieldMapping().fields, false, Operation.DELETE); - deletedEntities.add(typedInstance); - } - - /** - * Delete all traits from the specified vertex. - * - * @param instanceVertex - * @throws AtlasException - */ - private void deleteAllTraits(Vertex instanceVertex) throws AtlasException { - List traitNames = GraphHelper.getTraitNames(instanceVertex); - final String entityTypeName = GraphHelper.getTypeName(instanceVertex); - for (String traitNameToBeDeleted : traitNames) { - String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted); - Iterator results = instanceVertex.getEdges(Direction.OUT, relationshipLabel).iterator(); - if (results.hasNext()) { // there should only be one edge for this label - final Edge traitEdge = results.next(); - final Vertex traitVertex = traitEdge.getVertex(Direction.IN); - - // remove the edge to the trait instance from the repository - graphHelper.removeEdge(traitEdge); - - if (traitVertex != null) { // remove the trait instance from the repository - deleteTraitVertex(traitNameToBeDeleted, traitVertex); - } - } - } - } - - void deleteTraitVertex(String traitName, final Vertex traitVertex) throws AtlasException { - - TraitType traitType = typeSystem.getDataType(TraitType.class, traitName); - ITypedStruct traitStruct = traitType.createInstance(); - - // Remove trait vertex along with any struct and class attributes owned by this trait. - mapInstanceToVertex(traitStruct, traitVertex, traitType.fieldMapping().fields, false, Operation.DELETE); - } - - - /** - * Get the GUIDs of entities that have been deleted. - * - * @return - */ - List getDeletedEntityGuids() { - if (deletedEntityGuids.size() == 0) { - return Collections.emptyList(); - } - else { - return Collections.unmodifiableList(deletedEntityGuids); - } - } - - /** - * Get the entities that have been deleted. - * - * @return - */ - List getDeletedEntities() { - if (deletedEntities.size() == 0) { - return Collections.emptyList(); - } - else { - return Collections.unmodifiableList(deletedEntities); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java index 71e9028..3fb128c 100755 --- a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java @@ -206,9 +206,10 @@ public class GraphBackedTypeStore implements ITypeStore { } private void addEdge(Vertex fromVertex, Vertex toVertex, String label) { - Iterable edges = GraphHelper.getOutGoingEdgesByLabel(fromVertex, label); + Iterator edges = GraphHelper.getOutGoingEdgesByLabel(fromVertex, label); // ATLAS-474: Check if this type system edge already exists, to avoid duplicates. - for (Edge edge : edges) { + while (edges.hasNext()) { + Edge edge = edges.next(); if (edge.getVertex(Direction.IN).equals(toVertex)) { LOG.debug("Edge from {} to {} with label {} already exists", toString(fromVertex), toString(toVertex), label); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java index d07f89d..66e1365 100644 --- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java @@ -69,6 +69,7 @@ public class BaseHiveRepositoryTest { protected void setUp() throws Exception { setUpTypes(); new GraphBackedSearchIndexer(graphProvider); + RequestContext.createContext(); setupInstances(); TestUtils.dumpGraph(graphProvider.get()); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/test/java/org/apache/atlas/TestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java index a3cf929..345e874 100755 --- a/repository/src/test/java/org/apache/atlas/TestUtils.java +++ b/repository/src/test/java/org/apache/atlas/TestUtils.java @@ -21,14 +21,12 @@ package org.apache.atlas; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.thinkaurelius.titan.core.TitanGraph; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter; - import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.AttributeDefinition; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; @@ -46,12 +44,14 @@ import org.testng.Assert; import java.io.File; import java.util.Collection; +import java.util.Date; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef; /** * Test utility class. @@ -75,16 +75,7 @@ public final class TestUtils { System.out.println("tempFile.getPath() = " + tempFile.getPath()); GraphSONWriter.outputGraph(titanGraph, tempFile.getPath()); - System.out.println("Vertices:"); - for (Vertex vertex : titanGraph.getVertices()) { - System.out.println(GraphHelper.vertexString(vertex)); - } - - System.out.println("Edges:"); - for (Edge edge : titanGraph.getEdges()) { - System.out.println(GraphHelper.edgeString(edge)); - } - + GraphHelper.dumpToLog(titanGraph); return tempFile.getPath(); } @@ -106,9 +97,9 @@ public final class TestUtils { createStructTypeDef("Address", "Address"+_description, createRequiredAttrDef("street", DataTypes.STRING_TYPE), createRequiredAttrDef("city", DataTypes.STRING_TYPE)); - HierarchicalTypeDefinition deptTypeDef = createClassTypeDef("Department", "Department"+_description, ImmutableSet.of(), + HierarchicalTypeDefinition deptTypeDef = createClassTypeDef(DEPARTMENT_TYPE, "Department"+_description, ImmutableSet.of(), createRequiredAttrDef("name", DataTypes.STRING_TYPE), - new AttributeDefinition("employees", String.format("array<%s>", "Person"), Multiplicity.COLLECTION, + new AttributeDefinition("employees", String.format("array<%s>", "Person"), Multiplicity.OPTIONAL, true, "department")); HierarchicalTypeDefinition personTypeDef = createClassTypeDef("Person", "Person"+_description, ImmutableSet.of(), @@ -132,9 +123,13 @@ public final class TestUtils { ImmutableList.of(deptTypeDef, personTypeDef, managerTypeDef)); } - public static Referenceable createDeptEg1(TypeSystem ts) throws AtlasException { - Referenceable hrDept = new Referenceable(ENTITY_TYPE); - Referenceable john = new Referenceable("Person"); + public static final String DEPARTMENT_TYPE = "Department"; + public static final String PERSON_TYPE = "Person"; + + public static ITypedReferenceableInstance createDeptEg1(TypeSystem ts) throws AtlasException { + Referenceable hrDept = new Referenceable(DEPARTMENT_TYPE); + Referenceable john = new Referenceable(PERSON_TYPE); + Referenceable jane = new Referenceable("Manager", "SecurityClearance"); Referenceable johnAddr = new Referenceable("Address"); Referenceable janeAddr = new Referenceable("Address"); @@ -183,13 +178,13 @@ public final class TestUtils { ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); Assert.assertNotNull(hrDept2); - return hrDept; + return hrDept2; } - public static final String ENTITY_TYPE = "Department"; public static final String DATABASE_TYPE = "hive_database"; public static final String DATABASE_NAME = "foo"; public static final String TABLE_TYPE = "hive_table"; + public static final String PROCESS_TYPE = "hive_process"; public static final String COLUMN_TYPE = "column_type"; public static final String TABLE_NAME = "bar"; public static final String CLASSIFICATION = "classification"; @@ -200,6 +195,9 @@ public final class TestUtils { public static final String PARTITION_CLASS_TYPE = "partition_class_type"; public static final String SERDE_TYPE = "serdeType"; public static final String COLUMNS_MAP = "columnsMap"; + public static final String COLUMNS_ATTR_NAME = "columns"; + + public static final String NAME = "name"; public static TypesDef defineHiveTypes() { String _description = "_description"; @@ -211,7 +209,7 @@ public final class TestUtils { HierarchicalTypeDefinition databaseTypeDefinition = createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE + _description,ImmutableSet.of(SUPER_TYPE_NAME), - TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), + TypesUtil.createUniqueRequiredAttrDef(NAME, DataTypes.STRING_TYPE), createOptionalAttrDef("created", DataTypes.DATE_TYPE), createRequiredAttrDef("description", DataTypes.STRING_TYPE)); @@ -227,7 +225,7 @@ public final class TestUtils { HierarchicalTypeDefinition columnsDefinition = createClassTypeDef(COLUMN_TYPE, ImmutableSet.of(), - createRequiredAttrDef("name", DataTypes.STRING_TYPE), + createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), createRequiredAttrDef("type", DataTypes.STRING_TYPE)); StructTypeDefinition partitionDefinition = new StructTypeDefinition("partition_struct_type", "partition_struct_type" + _description, @@ -268,6 +266,12 @@ public final class TestUtils { new HierarchicalTypeDefinition<>(ClassType.class, "partition_class_type", "partition_class_type" + _description, ImmutableSet.of(SUPER_TYPE_NAME), partClsAttributes); + HierarchicalTypeDefinition processClsType = + new HierarchicalTypeDefinition<>(ClassType.class, PROCESS_TYPE, PROCESS_TYPE + _description, + ImmutableSet.of(), new AttributeDefinition[]{ + new AttributeDefinition("outputs", "array<" + TABLE_TYPE + ">", Multiplicity.OPTIONAL, false, null) + }); + HierarchicalTypeDefinition tableTypeDefinition = createClassTypeDef(TABLE_TYPE, TABLE_TYPE + _description, ImmutableSet.of(SUPER_TYPE_NAME), TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), @@ -322,7 +326,8 @@ public final class TestUtils { return TypesUtil.getTypesDef(ImmutableList.of(enumTypeDefinition), ImmutableList.of(structTypeDefinition, partitionDefinition), ImmutableList.of(classificationTypeDefinition, fetlClassificationTypeDefinition, piiTypeDefinition), - ImmutableList.of(superTypeDefinition, databaseTypeDefinition, columnsDefinition, tableTypeDefinition, storageDescClsDef, partClsDef)); + ImmutableList.of(superTypeDefinition, databaseTypeDefinition, columnsDefinition, tableTypeDefinition, + storageDescClsDef, partClsDef, processClsType)); } public static Collection createHiveTypes(TypeSystem typeSystem) throws Exception { @@ -336,4 +341,31 @@ public final class TestUtils { public static final String randomString() { return RandomStringUtils.randomAlphanumeric(10); } + + public static Referenceable createDBEntity() { + Referenceable entity = new Referenceable(DATABASE_TYPE); + String dbName = RandomStringUtils.randomAlphanumeric(10); + entity.set(NAME, dbName); + entity.set("description", "us db"); + return entity; + } + + public static Referenceable createTableEntity(String dbId) { + Referenceable entity = new Referenceable(TABLE_TYPE); + String tableName = RandomStringUtils.randomAlphanumeric(10); + entity.set(NAME, tableName); + entity.set("description", "random table"); + entity.set("type", "type"); + entity.set("tableType", "MANAGED"); + entity.set("database", new Id(dbId, 0, DATABASE_TYPE)); + entity.set("created", new Date()); + return entity; + } + + public static Referenceable createColumnEntity() { + Referenceable entity = new Referenceable(COLUMN_TYPE); + entity.set(NAME, RandomStringUtils.randomAlphanumeric(10)); + entity.set("type", "VARCHAR(32)"); + return entity; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java index ea93cbf..06ac492 100755 --- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java @@ -21,6 +21,7 @@ package org.apache.atlas.discovery; import com.google.common.collect.ImmutableSet; import org.apache.atlas.BaseHiveRepositoryTest; import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.RequestContext; import org.apache.atlas.TestUtils; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.repository.Constants; @@ -38,6 +39,7 @@ import org.codehaus.jettison.json.JSONObject; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -67,11 +69,8 @@ public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest { TypeSystem typeSystem = TypeSystem.getInstance(); TestUtils.defineDeptEmployeeTypes(typeSystem); - Referenceable hrDept = TestUtils.createDeptEg1(typeSystem); - ClassType deptType = typeSystem.getDataType(ClassType.class, "Department"); - ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); - - repositoryService.createEntities(hrDept2); + ITypedReferenceableInstance hrDept = TestUtils.createDeptEg1(typeSystem); + repositoryService.createEntities(hrDept); ITypedReferenceableInstance jane = repositoryService.getEntityDefinition("Person", "name", "Jane"); Id janeGuid = jane.getId(); @@ -81,6 +80,11 @@ public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest { repositoryService.updateEntities(instance); } + @BeforeMethod + public void setupContext() { + RequestContext.createContext(); + } + @AfterClass public void tearDown() throws Exception { super.tearDown();