falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/2] git commit: FALCON-334 Add indexing to the graph property keys. Contributed by Venkatesh Seetharam
Date Tue, 11 Mar 2014 00:07:32 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 5445e109b -> 6b3288621


FALCON-334 Add indexing to the graph property keys. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/6b328862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/6b328862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/6b328862

Branch: refs/heads/master
Commit: 6b3288621b2186abdf243d498a78df5f8c558caf
Parents: 3158c03
Author: Venkatesh Seetharam <venkatesh@hortonworks.com>
Authored: Mon Mar 10 16:11:21 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@hortonworks.com>
Committed: Mon Mar 10 17:07:37 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../EntityRelationshipGraphBuilder.java         |  24 ++-
 .../org/apache/falcon/metadata/GraphUtils.java  |  10 +-
 .../InstanceRelationshipGraphBuilder.java       |  12 +-
 .../apache/falcon/metadata/LineageRecorder.java |   8 +
 .../falcon/metadata/MetadataMappingService.java |  97 +++++++++---
 .../metadata/RelationshipGraphBuilder.java      |  43 +++---
 .../metadata/MetadataMappingServiceTest.java    |  42 +++---
 .../falcon/workflow/FalconPostProcessing.java   |  10 +-
 .../metadata/LineageMetadataResource.java       | 146 +++++++++----------
 .../falcon/service/FalconTopicSubscriber.java   |   2 +-
 11 files changed, 230 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64bb353..7b15769 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -69,6 +69,8 @@ Trunk (Unreleased)
     FALCON-257 File system storage wont work with relative paths
     (Venkatesh Seetharam)
 
+    FALCON-334 Add indexing to the graph property keys. (Venkatesh Seetharam)
+
   OPTIMIZATIONS
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 6182dbe..4e94f61 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.falcon.metadata;
 
-import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -47,7 +47,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
     public static final String PROCESS_ENTITY_TYPE = "process-entity";
 
 
-    public EntityRelationshipGraphBuilder(KeyIndexableGraph graph, boolean preserveHistory)
{
+    public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
         super(graph, preserveHistory);
     }
 
@@ -77,7 +77,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
         LOG.info("Updating feed entity: " + newFeed.getName());
         Vertex feedEntityVertex = findVertex(oldFeed.getName(), FEED_ENTITY_TYPE);
         if (feedEntityVertex == null) {
-            throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
+            // todo - throw new IllegalStateException(oldFeed.getName() + " entity vertex
must exist.");
+            LOG.error("Illegal State: Feed entity vertex must exist for " + oldFeed.getName());
+            return;
         }
 
         updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
@@ -108,7 +110,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
         LOG.info("Updating process entity: " + newProcess.getName());
         Vertex processEntityVertex = findVertex(oldProcess.getName(), PROCESS_ENTITY_TYPE);
         if (processEntityVertex == null) {
-            throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist.");
+            // todo - throw new IllegalStateException(oldProcess.getName() + " entity vertex
must exist");
+            LOG.error("Illegal State: Process entity vertex must exist for " + oldProcess.getName());
+            return;
         }
 
         updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
@@ -128,7 +132,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
     public void addRelationToCluster(Vertex fromVertex, String clusterName, String edgeLabel)
{
         Vertex clusterVertex = findVertex(clusterName, CLUSTER_ENTITY_TYPE);
         if (clusterVertex == null) { // cluster must exist before adding other entities
-            throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
+            // todo - throw new IllegalStateException("Cluster entity vertex must exist:
" + clusterName);
+            LOG.error("Illegal State: Cluster entity vertex must exist for " + clusterName);
+            return;
         }
 
         addEdge(fromVertex, clusterVertex, edgeLabel);
@@ -157,7 +163,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
     public void addProcessFeedEdge(Vertex processVertex, String feedName, String edgeLabel)
{
         Vertex feedVertex = findVertex(feedName, FEED_ENTITY_TYPE);
         if (feedVertex == null) {
-            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
+            // todo - throw new IllegalStateException("Feed entity vertex must exist: " +
feedName);
+            LOG.error("Illegal State: Feed entity vertex must exist for " + feedName);
+            return;
         }
 
         addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
@@ -366,7 +374,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder
{
     public void removeProcessFeedEdge(Vertex processVertex, String feedName, String edgeLabel)
{
         Vertex feedVertex = findVertex(feedName, FEED_ENTITY_TYPE);
         if (feedVertex == null) {
-            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
+            // todo - throw new IllegalStateException("Feed entity vertex must exist: " +
feedName);
+            LOG.error("Illegal State: Feed entity vertex must exist for " + feedName);
+            return;
         }
 
         if (edgeLabel.equals(FEED_PROCESS_EDGE_LABEL)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
index e98e44f..24bf30f 100644
--- a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
+++ b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
@@ -20,7 +20,7 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter;
 import org.apache.log4j.Logger;
@@ -38,7 +38,7 @@ public final class GraphUtils {
     private GraphUtils() {
     }
 
-    public static void dumpToLog(final KeyIndexableGraph graph) {
+    public static void dumpToLog(final Graph graph) {
         LOG.debug("Vertices of " + graph);
         for (Vertex vertex : graph.getVertices()) {
             LOG.debug(vertexString(vertex));
@@ -50,15 +50,15 @@ public final class GraphUtils {
         }
     }
 
-    public static void dump(final KeyIndexableGraph graph) throws IOException {
+    public static void dump(final Graph graph) throws IOException {
         dump(graph, System.out);
     }
 
-    public static void dump(final KeyIndexableGraph graph, OutputStream outputStream) throws
IOException {
+    public static void dump(final Graph graph, OutputStream outputStream) throws IOException
{
         GraphSONWriter.outputGraph(graph, outputStream);
     }
 
-    public static void dump(final KeyIndexableGraph graph, String fileName) throws IOException
{
+    public static void dump(final Graph graph, String fileName) throws IOException {
         GraphSONWriter.outputGraph(graph, fileName);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 11e8f86..0b121f8 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.falcon.metadata;
 
-import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
@@ -27,10 +27,10 @@ import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.log4j.Logger;
 
 import java.net.URISyntaxException;
@@ -61,7 +61,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder
{
     // instance edge labels
     public static final String INSTANCE_ENTITY_EDGE_LABEL = "instance-of";
 
-    public InstanceRelationshipGraphBuilder(KeyIndexableGraph graph, boolean preserveHistory)
{
+    public InstanceRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
         super(graph, preserveHistory);
     }
 
@@ -108,7 +108,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder
{
         Vertex entityVertex = findVertex(entityName, entityType);
         LOG.info("Vertex exists? name=" + entityName + ", type=" + entityType + ", v=" +
entityVertex);
         if (entityVertex == null) {
-            throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
+            // todo - throw new IllegalStateException(entityType + " entity vertex must exist
" + entityName);
+            LOG.error("Illegal State: " + entityType + " vertex must exist for " + entityName);
+            return;
         }
 
         addEdge(instanceVertex, entityVertex, edgeLabel);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
index ae9d88a..9f6965a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
@@ -106,6 +106,14 @@ public class LineageRecorder  extends Configured implements Tool {
         return logDir + entityName + "-lineage.json";
     }
 
+    /**
+     * this method is invoked from with in the workflow.
+     *
+     * @param lineageMetadata metadata to persist
+     * @param lineageFile file to serialize the metadata
+     * @throws IOException
+     * @throws FalconException
+     */
     protected void persistLineageMetadata(Map<String, String> lineageMetadata,
                                           String lineageFile) throws IOException, FalconException
{
         OutputStream out = null;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 75f7f88..3a6db91 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -18,9 +18,12 @@
 
 package org.apache.falcon.metadata;
 
+import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
 import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphFactory;
 import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.TransactionalGraph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
@@ -48,13 +51,6 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
     private static final Logger LOG = Logger.getLogger(MetadataMappingService.class);
 
     /**
-     * Entity operations.
-     */
-    public enum EntityOperations {
-        GENERATE, REPLICATE, DELETE
-    }
-
-    /**
      * Constance for the service name.
      */
     public static final String SERVICE_NAME = MetadataMappingService.class.getSimpleName();
@@ -64,7 +60,8 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
      */
     private static final String FALCON_PREFIX = "falcon.graph.";
 
-    private KeyIndexableGraph graph;
+
+    private Graph graph;
     private Set<String> vertexIndexedKeys;
     private Set<String> edgeIndexedKeys;
     private EntityRelationshipGraphBuilder entityGraphBuilder;
@@ -78,12 +75,14 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
     @Override
     public void init() throws FalconException {
         graph = initializeGraphDB();
+        createIndicesForVertexKeys();
+        // todo - create Edge Cardinality Constraints
         LOG.info("Initialized graph db: " + graph);
 
-        vertexIndexedKeys = graph.getIndexedKeys(Vertex.class);
+        vertexIndexedKeys = getIndexableGraph().getIndexedKeys(Vertex.class);
         LOG.info("Init vertex property keys: " + vertexIndexedKeys);
 
-        edgeIndexedKeys = graph.getIndexedKeys(Edge.class);
+        edgeIndexedKeys = getIndexableGraph().getIndexedKeys(Edge.class);
         LOG.info("Init edge property keys: " + edgeIndexedKeys);
 
         boolean preserveHistory = Boolean.valueOf(StartupProperties.get().getProperty(
@@ -94,16 +93,14 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         ConfigurationStore.get().registerListener(this);
     }
 
-    protected KeyIndexableGraph initializeGraphDB() {
+    protected Graph initializeGraphDB() {
         LOG.info("Initializing graph db");
 
         Configuration graphConfig = getConfiguration();
-        KeyIndexableGraph graphDB = (KeyIndexableGraph) GraphFactory.open(graphConfig);
-        createIndicesForVertexKeys(graphDB);
-        return graphDB;
+        return GraphFactory.open(graphConfig);
     }
 
-    public Configuration getConfiguration() {
+    public static Configuration getConfiguration() {
         Configuration graphConfig = new BaseConfiguration();
 
         Properties configProperties = StartupProperties.get();
@@ -119,19 +116,58 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         return graphConfig;
     }
 
-    protected void createIndicesForVertexKeys(KeyIndexableGraph graphDB) {
-        LOG.info("Creating indexes for graph");
+    /**
+     * This unfortunately requires a handle to Titan implementation since
+     * com.tinkerpop.blueprints.KeyIndexableGraph#createKeyIndex does not create an index.
+     */
+    protected void createIndicesForVertexKeys() {
+        if (!((KeyIndexableGraph) graph).getIndexedKeys(Vertex.class).isEmpty()) {
+            LOG.info("Indexes already exist for graph");
+            return;
+        }
+
+        LOG.info("Indexes does not exist, Creating indexes for graph");
         // todo - externalize this
-        graphDB.createKeyIndex(RelationshipGraphBuilder.NAME_PROPERTY_KEY, Vertex.class);
-        graphDB.createKeyIndex(RelationshipGraphBuilder.TYPE_PROPERTY_KEY, Vertex.class);
-        graphDB.createKeyIndex(RelationshipGraphBuilder.VERSION_PROPERTY_KEY, Vertex.class);
-        graphDB.createKeyIndex(RelationshipGraphBuilder.TIMESTAMP_PROPERTY_KEY, Vertex.class);
+        makeNameKeyIndex();
+        makeKeyIndex(RelationshipGraphBuilder.TYPE_PROPERTY_KEY);
+        makeKeyIndex(RelationshipGraphBuilder.TIMESTAMP_PROPERTY_KEY);
+        makeKeyIndex(RelationshipGraphBuilder.VERSION_PROPERTY_KEY);
     }
 
-    public KeyIndexableGraph getGraph() {
+    private void makeNameKeyIndex() {
+        getTitanGraph().makeKey(RelationshipGraphBuilder.NAME_PROPERTY_KEY)
+                .dataType(String.class)
+                .indexed(Vertex.class)
+                .indexed(Edge.class)
+                // .unique() todo this ought to be unique?
+                .make();
+        getTitanGraph().commit();
+    }
+
+    private void makeKeyIndex(String key) {
+        getTitanGraph().makeKey(key)
+                .dataType(String.class)
+                .indexed(Vertex.class)
+                .make();
+        getTitanGraph().commit();
+    }
+
+    public Graph getGraph() {
         return graph;
     }
 
+    public KeyIndexableGraph getIndexableGraph() {
+        return (KeyIndexableGraph) graph;
+    }
+
+    public TransactionalGraph getTransactionalGraph() {
+        return (TransactionalGraph) graph;
+    }
+
+    public TitanBlueprintsGraph getTitanGraph() {
+        return (TitanBlueprintsGraph) graph;
+    }
+
     public Set<String> getVertexIndexedKeys() {
         return vertexIndexedKeys;
     }
@@ -154,14 +190,17 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         switch (entityType) {
         case CLUSTER:
             entityGraphBuilder.addClusterEntity((Cluster) entity);
+            getTransactionalGraph().commit();
             break;
 
         case FEED:
             entityGraphBuilder.addFeedEntity((Feed) entity);
+            getTransactionalGraph().commit();
             break;
 
         case PROCESS:
             entityGraphBuilder.addProcessEntity((Process) entity);
+            getTransactionalGraph().commit();
             break;
 
         default:
@@ -186,10 +225,12 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
 
         case FEED:
             entityGraphBuilder.updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
+            getTransactionalGraph().commit();
             break;
 
         case PROCESS:
             entityGraphBuilder.updateProcessEntity((Process) oldEntity, (Process) newEntity);
+            getTransactionalGraph().commit();
             break;
 
         default:
@@ -202,8 +243,15 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         // are already added to the graph
     }
 
-    public void mapLineage(String entityName, String operation,
-                           String logDir) throws FalconException {
+    /**
+     * Entity operations.
+     */
+    public enum EntityOperations {
+        GENERATE, REPLICATE, DELETE
+    }
+
+    public void onSuccessfulWorkflowCompletion(String entityName, String operation,
+                                               String logDir) throws FalconException {
         String lineageFile = LineageRecorder.getFilePath(logDir, entityName);
 
         LOG.info("Parsing lineage metadata from: " + lineageFile);
@@ -215,6 +263,7 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         switch (entityOperation) {
         case GENERATE:
             onProcessInstanceAdded(lineageMetadata);
+            getTransactionalGraph().commit();
             break;
 
         case REPLICATE:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 1f607a6..5c0f8cb 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -20,8 +20,8 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
-import com.tinkerpop.blueprints.KeyIndexableGraph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.log4j.Logger;
@@ -65,7 +65,7 @@ public abstract class RelationshipGraphBuilder {
     /**
      * A blueprints graph.
      */
-    private final KeyIndexableGraph graph;
+    private final Graph graph;
 
     /**
      * If enabled, preserves history of tags and groups for instances else will only
@@ -73,12 +73,12 @@ public abstract class RelationshipGraphBuilder {
      */
     private final boolean preserveHistory;
 
-    protected RelationshipGraphBuilder(KeyIndexableGraph graph, boolean preserveHistory)
{
+    protected RelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
         this.graph = graph;
         this.preserveHistory = preserveHistory;
     }
 
-    protected KeyIndexableGraph getGraph() {
+    public Graph getGraph() {
         return graph;
     }
 
@@ -121,7 +121,7 @@ public abstract class RelationshipGraphBuilder {
                 .has(NAME_PROPERTY_KEY, name)
                 .has(TYPE_PROPERTY_KEY, type);
         Iterator<Vertex> results = query.vertices().iterator();
-        return results.hasNext() ? results.next() : null;
+        return results.hasNext() ? results.next() : null;  // returning one since name is
unique
     }
 
     protected Vertex createVertex(String name, String type) {
@@ -142,37 +142,38 @@ public abstract class RelationshipGraphBuilder {
     }
 
     protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
-        Edge edge = findEdge(fromVertex, edgeLabel);
-        return edgeExists(edge, toVertex) ? edge : fromVertex.addEdge(edgeLabel, toVertex);
+        Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
+        return edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
     }
 
     protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
-        Edge edge = findEdge(fromVertex, edgeLabel);
-        if (edgeExists(edge, toVertex)) {
+        Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
+        if (edge != null) {
             getGraph().removeEdge(edge);
         }
     }
 
     protected void removeEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
-        Edge edge = findEdge(fromVertex, edgeLabel);
-        if (edgeExists(edge, toVertexName)) {
+        Edge edge = findEdge(fromVertex, toVertexName, edgeLabel);
+        if (edge != null) {
             getGraph().removeEdge(edge);
         }
     }
 
-    protected boolean edgeExists(Edge edge, Object toVertexName) {
-        return edge != null && edge.getVertex(Direction.IN).getProperty(NAME_PROPERTY_KEY)
-                .equals(toVertexName);
+    protected Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
+        return findEdge(fromVertex, toVertex.getProperty(NAME_PROPERTY_KEY), edgeLabel);
     }
 
-    protected boolean edgeExists(Edge edge, Vertex toVertex) {
-        return edge != null && edge.getVertex(Direction.IN).getProperty(NAME_PROPERTY_KEY)
-                .equals(toVertex.getProperty(NAME_PROPERTY_KEY));
-    }
+    protected Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
+        Edge edgeToFind = null;
+        for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) {
+            if (edge.getVertex(Direction.IN).getProperty(NAME_PROPERTY_KEY).equals(toVertexName))
{
+                edgeToFind = edge;
+                break;
+            }
+        }
 
-    protected Edge findEdge(Vertex fromVertex, String edgeLabel) {
-        Iterator<Edge> edges = fromVertex.getEdges(Direction.OUT, edgeLabel).iterator();
-        return edges.hasNext() ? edges.next() : null;
+        return edgeToFind;
     }
 
     protected void addUserRelation(Vertex fromVertex) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 7045636..3c5f69d 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -20,8 +20,8 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
-import com.tinkerpop.blueprints.KeyIndexableGraph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -216,10 +216,13 @@ public class MetadataMappingServiceTest {
 
     @Test(dependsOnMethods = "testOnAdd")
     public void testMapLineage() throws Exception {
+        // shutdown the graph and resurrect for testing
+        service.destroy();
+        service.init();
 
         LineageRecorder.main(getTestMessageArgs());
 
-        service.mapLineage(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+        service.onSuccessfulWorkflowCompletion(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
@@ -233,6 +236,10 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testMapLineage")
     public void testOnChange() throws Exception {
+        // shutdown the graph and resurrect for testing
+        service.destroy();
+        service.init();
+
         // cannot modify cluster, adding a new cluster
         bcpCluster = buildCluster("bcp-cluster", "east-coast", "classification=bcp");
         configStore.publish(EntityType.CLUSTER, bcpCluster);
@@ -284,12 +291,13 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse");
 
         // new cluster
-        Iterator<Edge> clusterEdgeIterator = feedVertex.getEdges(Direction.OUT,
-                RelationshipGraphBuilder.FEED_CLUSTER_EDGE_LABEL).iterator();
-        edge = clusterEdgeIterator.next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), clusterEntity.getName());
-        edge = clusterEdgeIterator.next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
+        List<String> actual = new ArrayList<String>();
+        for (Edge clusterEdge : feedVertex.getEdges(
+                Direction.OUT, RelationshipGraphBuilder.FEED_CLUSTER_EDGE_LABEL)) {
+            actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
+        }
+        Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "bcp-cluster")),
+                "Actual does not contain expected: " + actual);
     }
 
     @Test(dependsOnMethods = "testOnFeedEntityChange")
@@ -320,14 +328,6 @@ public class MetadataMappingServiceTest {
                 RelationshipGraphBuilder.PROCESS_CLUSTER_EDGE_LABEL).iterator().next();
         Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
 
-/*
-        // workflow
-        edge = processVertex.getEdges(Direction.OUT,
-                RelationshipGraphBuilder.PROCESS_WORKFLOW_EDGE_LABEL).iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("version"),
-                newProcess.getWorkflow().getVersion());
-*/
-
         // inputs
         edge = processVertex.getEdges(Direction.IN,
                 RelationshipGraphBuilder.FEED_PROCESS_EDGE_LABEL).iterator().next();
@@ -340,7 +340,7 @@ public class MetadataMappingServiceTest {
         }
     }
 
-    public static void debug(final KeyIndexableGraph graph) {
+    public static void debug(final Graph graph) {
         System.out.println("*****Vertices of " + graph);
         for (Vertex vertex : graph.getVertices()) {
             System.out.println(GraphUtils.vertexString(vertex));
@@ -631,7 +631,7 @@ public class MetadataMappingServiceTest {
         Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected:
" + actual);
     }
 
-    public long getVerticesCount(final KeyIndexableGraph graph) {
+    public long getVerticesCount(final Graph graph) {
         long count = 0;
         for (Vertex ignored : graph.getVertices()) {
             count++;
@@ -640,7 +640,7 @@ public class MetadataMappingServiceTest {
         return count;
     }
 
-    public long getEdgesCount(final KeyIndexableGraph graph) {
+    public long getEdgesCount(final Graph graph) {
         long count = 0;
         for (Edge ignored : graph.getEdges()) {
             count++;
@@ -656,7 +656,7 @@ public class MetadataMappingServiceTest {
                 "imp-click-join1/20140101", "imp-click-join2/20140101");
         Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
 
-        KeyIndexableGraph graph = service.getGraph();
+        Graph graph = service.getGraph();
 
         Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/20140101").iterator();
         Assert.assertTrue(vertices.hasNext());
@@ -708,7 +708,7 @@ public class MetadataMappingServiceTest {
         };
     }
 
-    private void cleanupGraphStore(KeyIndexableGraph graph) {
+    private void cleanupGraphStore(Graph graph) {
         for (Edge edge : graph.getEdges()) {
             graph.removeEdge(edge);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 6584dfe..fc4eabd 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -104,6 +104,11 @@ public class FalconPostProcessing extends Configured implements Tool
{
         LOG.info("Sending user message " + cmd);
         invokeUserMessageProducer(cmd);
 
+        if ("SUCCEEDED".equals(Arg.STATUS.getOptionValue(cmd))) {
+            LOG.info("Recording lineage for " + cmd);
+            recordLineageMetadata(cmd);
+        }
+
         //LogMover doesn't throw exception, a failed log mover will not fail the user workflow
         LOG.info("Moving logs " + cmd);
         invokeLogProducer(cmd);
@@ -111,11 +116,6 @@ public class FalconPostProcessing extends Configured implements Tool
{
         LOG.info("Sending falcon message " + cmd);
         invokeFalconMessageProducer(cmd);
 
-        if ("SUCCEEDED".equals(Arg.STATUS.getOptionValue(cmd))) {
-            LOG.info("Recording lineage for " + cmd);
-            recordLineageMetadata(cmd);
-        }
-
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index f140bca..7c307e3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -20,7 +20,8 @@ package org.apache.falcon.resource.metadata;
 
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Element;
+import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.VertexQuery;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
@@ -65,7 +66,7 @@ public class LineageMetadataResource {
         }
     }
 
-    private KeyIndexableGraph getGraph() {
+    private Graph getGraph() {
         return service.getGraph();
     }
 
@@ -113,17 +114,7 @@ public class LineageMetadataResource {
         checkIfMetadataMappingServiceIsEnabled();
         LOG.info("Get All Vertices");
         try {
-            JSONArray vertexArray = new JSONArray();
-            long counter = 0;
-            for (Vertex vertex : getGraph().getVertices()) {
-                counter++;
-                vertexArray.put(GraphSONUtility.jsonFromElement(
-                        vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL));
-            }
-
-            JSONObject response = new JSONObject();
-            response.put(RESULTS, vertexArray);
-            response.put(TOTAL_SIZE, counter);
+            JSONObject response = buildJSONResponse(getGraph().getVertices());
             return Response.ok(response).build();
         } catch (JSONException e) {
             throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
@@ -176,20 +167,9 @@ public class LineageMetadataResource {
         checkIfMetadataMappingServiceIsEnabled();
         LOG.info("Get vertices for property name= " + name + ", value= " + value);
         try {
-            Iterable<Vertex> vertices = getGraph().getVertices(name, value);
-            final JSONArray vertexArray = new JSONArray();
-
-            long counter = 0;
-            for (Vertex vertex : vertices) {
-                counter++;
-                vertexArray.put(GraphSONUtility.jsonFromElement(
-                        vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL));
-            }
-
-            JSONObject response = new JSONObject();
-            response.put(RESULTS, vertexArray);
-            response.put(TOTAL_SIZE, counter);
+            JSONObject response = buildJSONResponse(getGraph().getVertices(name, value));
             return Response.ok(response).build();
+
         } catch (JSONException e) {
             throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
                     .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
@@ -219,53 +199,58 @@ public class LineageMetadataResource {
                         .entity(JSONObject.quote(message)).build());
             }
 
-            // break out the segment into the return and the direction
-            VertexQueryArguments queryArguments = new VertexQueryArguments(direction);
-            // if this is a query and the _return is "count" then we don't bother to send
back the result array
-            boolean countOnly = queryArguments.isCountOnly();
-            // what kind of data the calling client wants back (vertices, edges, count, vertex
identifiers)
-            ReturnType returnType = queryArguments.getReturnType();
-            // the query direction (both, out, in)
-            Direction queryDirection = queryArguments.getQueryDirection();
-
-            VertexQuery query = vertex.query().direction(queryDirection);
-
-            JSONArray elementArray = new JSONArray();
-            long counter = 0;
-            if (returnType == ReturnType.VERTICES || returnType == ReturnType.VERTEX_IDS)
{
-                Iterable<Vertex> vertexQueryResults = query.vertices();
-                for (Vertex v : vertexQueryResults) {
-                    if (returnType.equals(ReturnType.VERTICES)) {
-                        elementArray.put(GraphSONUtility.jsonFromElement(
-                                v, getVertexIndexedKeys(), GraphSONMode.NORMAL));
-                    } else {
-                        elementArray.put(v.getId());
-                    }
-                    counter++;
-                }
-            } else if (returnType == ReturnType.EDGES) {
-                Iterable<Edge> edgeQueryResults = query.edges();
-                for (Edge e : edgeQueryResults) {
-                    elementArray.put(GraphSONUtility.jsonFromElement(
-                            e, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
-                    counter++;
-                }
-            } else if (returnType == ReturnType.COUNT) {
-                counter = query.count();
-            }
+            return getVertexEdges(vertex, direction);
 
-            JSONObject response = new JSONObject();
-            if (!countOnly) {
-                response.put(RESULTS, elementArray);
-            }
-            response.put(TOTAL_SIZE, counter);
-            return Response.ok(response).build();
         } catch (JSONException e) {
             throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
                     .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
         }
     }
 
+    private Response getVertexEdges(Vertex vertex, String direction) throws JSONException
{
+        // break out the segment into the return and the direction
+        VertexQueryArguments queryArguments = new VertexQueryArguments(direction);
+        // if this is a query and the _return is "count" then we don't bother to send back
the result array
+        boolean countOnly = queryArguments.isCountOnly();
+        // what kind of data the calling client wants back (vertices, edges, count, vertex
identifiers)
+        ReturnType returnType = queryArguments.getReturnType();
+        // the query direction (both, out, in)
+        Direction queryDirection = queryArguments.getQueryDirection();
+
+        VertexQuery query = vertex.query().direction(queryDirection);
+
+        JSONArray elementArray = new JSONArray();
+        long counter = 0;
+        if (returnType == ReturnType.VERTICES || returnType == ReturnType.VERTEX_IDS) {
+            Iterable<Vertex> vertexQueryResults = query.vertices();
+            for (Vertex v : vertexQueryResults) {
+                if (returnType.equals(ReturnType.VERTICES)) {
+                    elementArray.put(GraphSONUtility.jsonFromElement(
+                            v, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+                } else {
+                    elementArray.put(v.getId());
+                }
+                counter++;
+            }
+        } else if (returnType == ReturnType.EDGES) {
+            Iterable<Edge> edgeQueryResults = query.edges();
+            for (Edge e : edgeQueryResults) {
+                elementArray.put(GraphSONUtility.jsonFromElement(
+                        e, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
+                counter++;
+            }
+        } else if (returnType == ReturnType.COUNT) {
+            counter = query.count();
+        }
+
+        JSONObject response = new JSONObject();
+        if (!countOnly) {
+            response.put(RESULTS, elementArray);
+        }
+        response.put(TOTAL_SIZE, counter);
+        return Response.ok(response).build();
+    }
+
     /**
      * Get all edges.
      *
@@ -279,18 +264,9 @@ public class LineageMetadataResource {
         checkIfMetadataMappingServiceIsEnabled();
         LOG.info("Get All Edges.");
         try {
-            JSONArray vertexArray = new JSONArray();
-            long counter = 0;
-            for (Edge edge : getGraph().getEdges()) {
-                counter++;
-                vertexArray.put(GraphSONUtility.jsonFromElement(
-                        edge, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
-            }
-
-            JSONObject response = new JSONObject();
-            response.put(RESULTS, vertexArray);
-            response.put(TOTAL_SIZE, counter);
+            JSONObject response = buildJSONResponse(getGraph().getEdges());
             return Response.ok(response).build();
+
         } catch (JSONException e) {
             throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
                     .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
@@ -328,6 +304,22 @@ public class LineageMetadataResource {
         }
     }
 
+    private <T extends Element> JSONObject buildJSONResponse(Iterable<T> elements)
throws JSONException {
+        JSONArray vertexArray = new JSONArray();
+        long counter = 0;
+        for (Element element : elements) {
+            counter++;
+            vertexArray.put(GraphSONUtility.jsonFromElement(
+                    element, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+        }
+
+        JSONObject response = new JSONObject();
+        response.put(RESULTS, vertexArray);
+        response.put(TOTAL_SIZE, counter);
+
+        return response;
+    }
+
     private void checkIfMetadataMappingServiceIsEnabled() {
         if (service == null) {
             throw new WebApplicationException(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6b328862/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index a32d2ee..64143e9 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -155,7 +155,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
     private void notifyMetadataMappingService(String entityName, String operation,
                                               String logDir) throws FalconException {
         MetadataMappingService service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
-        service.mapLineage(entityName, operation, logDir);
+        service.onSuccessfulWorkflowCompletion(entityName, operation, logDir);
     }
 
     private void debug(MapMessage mapMessage) throws JMSException {


Mime
View raw message