falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [2/3] git commit: FALCON-694 StringIndexOutOfBoundsException while updating graph DB for replicated instance. Contributed by Sowmya Ramesh
Date Tue, 09 Sep 2014 18:50:36 GMT
FALCON-694 StringIndexOutOfBoundsException while updating graph DB for replicated instance.
Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e9e849e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e9e849e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e9e849e7

Branch: refs/heads/master
Commit: e9e849e73fd5478c943f1b43daac76f8f8f232b1
Parents: e59bef7
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Tue Sep 9 11:49:21 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Tue Sep 9 11:49:21 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../InstanceRelationshipGraphBuilder.java       |   5 +-
 .../workflow/WorkflowExecutionContext.java      |  22 +---
 .../metadata/MetadataMappingServiceTest.java    | 131 +++++++++++++++----
 .../feed/FeedReplicationCoordinatorBuilder.java |   4 -
 .../feed/OozieFeedWorkflowBuilderTest.java      |   6 +-
 .../falcon/oozie/process/AbstractTestBase.java  |  13 +-
 .../cluster/util/EntityBuilderTestUtil.java     |  21 ++-
 8 files changed, 130 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 757dd99..4884512 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-694 StringIndexOutOfBoundsException while updating graph DB for
+   replicated instance (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-695 Lineage: "stored-in" edge is added between feed entity and
    target cluster (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 2f9fe8e..4d9fbcf 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -185,16 +185,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder
{
         String[] outputFeedNames = context.getOutputFeedNamesList();
         String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
         String targetClusterName = context.getClusterName();
-        String srcClusterName = context.getSrcClusterName();
 
         // For replication there will be only one output feed name
         String feedName = outputFeedNames[0];
         String feedInstanceDataPath = outputFeedInstancePaths[0];
 
         LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + srcClusterName);
+                + feedInstanceDataPath + ", in cluster: " + targetClusterName);
         RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
-        String feedInstanceName = getFeedInstanceName(feedName, srcClusterName,
+        String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index c074484..9c7b395 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,7 +54,6 @@ public class WorkflowExecutionContext {
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
-    public static final String CLUSTER_NAME_SEPARATOR = ",";
 
     /**
      * Workflow execution status.
@@ -161,26 +160,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        return value.split(CLUSTER_NAME_SEPARATOR)[0];
-    }
-
-    public String getSrcClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
-        if (parts.length != 2) {
-            throw new IllegalArgumentException("Replicated cluster pair is missing in " +
value);
-        }
-
-        return parts[1];
+        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
     }
 
     public String getEntityName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 7b73a91..3b9fdba 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
@@ -92,9 +93,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-    private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
-            "jail://global:00/falcon/imp-click-join1/20140101";
-    private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
+    private static final String REPLICATED_INSTANCE = "raw-click";
+    private static final String EVICTED_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
     public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
             "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
@@ -273,24 +273,33 @@ public class MetadataMappingServiceTest {
     public void  testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
-        String feedName = "imp-click-join1";
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-            EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName,
-            REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING);
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
-        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
 
-        verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS,
context,
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
 
-        // +3 = cluster, colo, tag
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 26);
-        // +3 = +2 edges for bcp cluster, no user but only to colo and new tag  + 1 for replicated-to
edge to target
-        // cluster for each output feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
+        // +6 [primary, bcp cluster] = cluster, colo, tag,
+        // +4 [input feed] = feed, tag, group, user
+        // +4 [output feed] = 1 feed + 1 tag + 2 groups
+        // +4 [process] = 1 process + 1 tag + 2 pipeline
+        // +3 = 1 process, 1 input, 1 output
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+        // +4 [cluster] = cluster to colo and tag [primary and bcp],
+        // +4 [input feed] = cluster, tag, group, user
+        // +5 [output feed] = cluster + user + Group + 2Tags
+        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
+        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed
instance
+        // +1 for replicated-to edge to target cluster for each output feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 40);
     }
 
     @Test
@@ -313,7 +322,7 @@ public class MetadataMappingServiceTest {
         List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
                 "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds,
ownedAndSecureFeeds);
-        String[] paths = EVICTED_OUTPUT_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
         for (String feedInstanceDataPath : paths) {
             verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
                     RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
@@ -481,10 +490,20 @@ public class MetadataMappingServiceTest {
     }
 
     private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
-                              Storage.TYPE storageType, String uriTemplate) throws Exception
{
-        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception
{
+        return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType,
uriTemplate);
+    }
+
+    private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception
{
+        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters,
                 tags, groups);
         addStorage(feed, storageType, uriTemplate);
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters())
{
+            if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
+                feedCluster.setType(ClusterType.TARGET);
+            }
+        }
         configStore.publish(EntityType.FEED, feed);
         return feed;
     }
@@ -524,6 +543,24 @@ public class MetadataMappingServiceTest {
         }
     }
 
+    private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed
feed,
+                                   Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            cluster.setLocations(new Locations());
+            cluster.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            cluster.setTable(table);
+        }
+    }
+
     private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType)
{
         Vertex entityVertex = getEntityVertex(entityName, entityType);
         Assert.assertNotNull(entityVertex);
@@ -759,7 +796,7 @@ public class MetadataMappingServiceTest {
                                                             WorkflowExecutionContext context,
                                                             RelationshipLabel edgeLabel)
throws Exception {
         String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
-                , context.getSrcClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
+                , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
 
         Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
@@ -777,7 +814,7 @@ public class MetadataMappingServiceTest {
                                                String falconInputFeeds) {
         String cluster;
         if (EntityOperations.REPLICATE == operation) {
-            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR
+ CLUSTER_ENTITY_NAME;
+            cluster = BCP_CLUSTER_ENTITY_NAME;
         } else {
             cluster = CLUSTER_ENTITY_NAME;
         }
@@ -858,14 +895,58 @@ public class MetadataMappingServiceTest {
     }
 
     private void setupForLineageReplication() throws Exception {
-        setup();
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        // Add backup cluster
+        Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+
+        Cluster[] clusters = {clusterEntity, bcpCluster};
+
+        // Add feed
+        Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
+        // Add uri template for each cluster
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters())
{
+            if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
+            } else {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
+            }
+        }
+
+        // update config store
+        try {
+            configStore.initiateUpdate(rawFeed);
+            configStore.update(EntityType.FEED, rawFeed);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+        inputFeeds.add(rawFeed);
+
+        // Add output feed
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-            EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
-            , WorkflowExecutionContext.Type.POST_PROCESSING);
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+                "jail://global:00/falcon/imp-click-join1/20140101",
+                "jail://global:00/falcon/raw-click/primary/20140101",
+                REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
-        // Add backup cluster
-        addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
     }
 
     private void setupForLineageEviciton() throws Exception {
@@ -884,12 +965,12 @@ public class MetadataMappingServiceTest {
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
-                        "imp-click-join1,imp-click-join1", EVICTED_OUTPUT_INSTANCE_PATHS,
null, null),
+                        "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null,
null),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         // Write to csv file
-        String csvData = EVICTED_OUTPUT_INSTANCE_PATHS;
+        String csvData = EVICTED_INSTANCE_PATHS;
         logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
         Path path = new Path(logFilePath);
         EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()),
path, csvData);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 5697eb6..966f90e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,10 +159,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
-        // Override CLUSTER_NAME property to include both source and target cluster
-        String clusterProperty = trgCluster.getName()
-                + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
-        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
         if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 379cf34..3c49353 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster, srcCluster,
+        verifyEntityProperties(feed, trgCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
 
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
 
-        verifyEntityProperties(aFeed, aCluster, srcCluster,
+        verifyEntityProperties(aFeed, aCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
+        verifyEntityProperties(tableFeed, trgCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 4e260e9..b547c31 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -217,22 +217,11 @@ public class AbstractTestBase {
     protected void verifyEntityProperties(Entity entity, Cluster cluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception
{
-        verifyEntityProperties(entity, cluster, null, operation, props);
-    }
-
-    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
-                                          WorkflowExecutionContext.EntityOperations operation,
-                                          HashMap<String, String> props) throws Exception
{
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 entity.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
                 entity.getEntityType().name());
-        if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
-            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
-                    cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR +
srcCluster.getName());
-        } else {
-            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
cluster.getName());
-        }
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster,
entity));
         Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
index b13ec08..66ceb37 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -74,7 +74,7 @@ public final class EntityBuilderTestUtil {
         return cluster;
     }
 
-    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups)
{
+    public static Feed buildFeed(String feedName, Cluster[] clusters, String tags, String
groups) {
         Feed feed = new Feed();
         feed.setName(feedName);
         feed.setTags(tags);
@@ -82,12 +82,15 @@ public final class EntityBuilderTestUtil {
         feed.setFrequency(Frequency.fromString("hours(1)"));
 
         org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
+                feedClusters = new org.apache.falcon.entity.v0.feed.Clusters();
+        feed.setClusters(feedClusters);
+
+        for (Cluster cluster : clusters) {
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                    new org.apache.falcon.entity.v0.feed.Cluster();
+            feedCluster.setName(cluster.getName());
+            feedClusters.getClusters().add(feedCluster);
+        }
 
         org.apache.falcon.entity.v0.feed.ACL feedACL = new org.apache.falcon.entity.v0.feed.ACL();
         feedACL.setOwner(USER);
@@ -98,6 +101,10 @@ public final class EntityBuilderTestUtil {
         return feed;
     }
 
+    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups)
{
+        return buildFeed(feedName, new Cluster[]{cluster}, tags, groups);
+    }
+
     public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
                                                                            Cluster cluster,
                                                                            String tags) throws
Exception {


Mime
View raw message