falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-1859 Feed export instances are not added to Graph DB
Date Fri, 03 Jun 2016 14:50:26 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 85ed40eba -> 16d2b39b2


FALCON-1859 Feed export instances are not added to Graph DB

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: "Balu Vellanki <balu@apache.org>"

Closes #165 from vramachan/FALCON-1859.Export.GraphDB


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/16d2b39b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/16d2b39b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/16d2b39b

Branch: refs/heads/master
Commit: 16d2b39b24f30b3562874585fcdc58ecf4460ad0
Parents: 85ed40e
Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>
Authored: Fri Jun 3 07:50:13 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Fri Jun 3 07:50:13 2016 -0700

----------------------------------------------------------------------
 .../apache/falcon/entity/v0/EntityGraph.java    | 10 +++
 .../falcon/entity/v0/EntityGraphTest.java       | 73 +++++++++++++++++++-
 2 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/16d2b39b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index e4d9385..acb570e 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -200,6 +200,16 @@ public final class EntityGraph implements ConfigurationChangeListener
{
                 feedEdges.add(dbNode);
                 dbEdges.add(feedNode);
             }
+
+            if (FeedHelper.isExportEnabled(cluster)) {
+                Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster));
+                if (!nodeEdges.containsKey(dbNode)) {
+                    nodeEdges.put(dbNode, new HashSet<Node>());
+                }
+                Set<Node> dbEdges = nodeEdges.get(dbNode);
+                feedEdges.add(dbNode);
+                dbEdges.add(feedNode);
+            }
         }
         return nodeEdges;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/16d2b39b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
index 23f69d7..b41cc03 100644
--- a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
@@ -20,17 +20,22 @@ package org.apache.falcon.entity.v0;
 
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Load;
 import org.apache.falcon.entity.v0.feed.Argument;
 import org.apache.falcon.entity.v0.feed.Arguments;
 import org.apache.falcon.entity.v0.feed.Clusters;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Extract;
 import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.FieldsType;
 import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
 import org.apache.falcon.entity.v0.feed.Import;
 import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Export;
+import org.apache.falcon.entity.v0.feed.LoadMethod;
+
+
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.process.Input;
@@ -184,6 +189,36 @@ public class EntityGraphTest extends AbstractTestBase {
         return imp;
     }
 
+    private Feed addFeedExport(String feed, Cluster cluster, Datasource ds) {
+
+        Feed f1 = new Feed();
+        f1.setName(feed);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName(cluster.getName());
+        feedCluster.setType(ClusterType.SOURCE);
+        Clusters clusters = new Clusters();
+        clusters.getClusters().add(feedCluster);
+        f1.setClusters(clusters);
+
+        Export exp = getAnExport(LoadMethod.UPDATEONLY, ds);
+        f1.getClusters().getClusters().get(0).setExport(exp);
+        return f1;
+    }
+
+    private Export getAnExport(LoadMethod loadMethod, Datasource ds) {
+
+        org.apache.falcon.entity.v0.feed.Datasource target = new org.apache.falcon.entity.v0.feed.Datasource();
+        target.setName(ds.getName());
+        target.setTableName("test-table");
+        Load load = new Load();
+        load.setType(loadMethod);
+        target.setLoad(load);
+        Export exp = new Export();
+        exp.setTarget(target);
+        return exp;
+    }
+
     private void attachInput(Process process, Feed feed) {
         if (process.getInputs() == null) {
             process.setInputs(new Inputs());
@@ -382,6 +417,42 @@ public class EntityGraphTest extends AbstractTestBase {
     }
 
     @Test
+    public void testOnAddExport() throws Exception {
+
+        Datasource ds = new Datasource();
+        ds.setName("test-db");
+        ds.setColo("c1");
+
+        Cluster cluster = new Cluster();
+        cluster.setName("ci1");
+        cluster.setColo("c1");
+
+        Feed f1 = addFeedExport("fe1", cluster, ds);
+
+        store.publish(EntityType.CLUSTER, cluster);
+        store.publish(EntityType.DATASOURCE, ds);
+        store.publish(EntityType.FEED, f1);
+
+        Set<Entity> entities = graph.getDependents(cluster);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(ds);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(f1);
+        Assert.assertEquals(entities.size(), 2);
+        Assert.assertTrue(entities.contains(cluster));
+        Assert.assertTrue(entities.contains(ds));
+
+        store.remove(EntityType.FEED, "fe1");
+        store.remove(EntityType.DATASOURCE, "test-db");
+        store.remove(EntityType.CLUSTER, "ci1");
+    }
+
+
+    @Test
     public void testOnRemoveDatasource() throws Exception {
 
         Datasource ds = new Datasource();


Mime
View raw message