Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 583F5102EF for ; Thu, 7 Nov 2013 19:15:56 +0000 (UTC) Received: (qmail 63987 invoked by uid 500); 7 Nov 2013 19:15:56 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 63951 invoked by uid 500); 7 Nov 2013 19:15:56 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 63944 invoked by uid 99); 7 Nov 2013 19:15:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Nov 2013 19:15:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 07 Nov 2013 19:15:54 +0000 Received: (qmail 63798 invoked by uid 99); 7 Nov 2013 19:15:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Nov 2013 19:15:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 27FA131D54A; Thu, 7 Nov 2013 19:15:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@falcon.incubator.apache.org Date: Thu, 07 Nov 2013 19:15:35 -0000 Message-Id: <51a4b93a7bdc49da9f9479fcfd7be9f0@git.apache.org> In-Reply-To: <6be45d87d4c64e8f826b09da4a49d49a@git.apache.org> References: <6be45d87d4c64e8f826b09da4a49d49a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: FALCON-169 multiple "/" in target for replication for multi target feed. Contributed by Venkatesh Seetharam X-Virus-Checked: Checked by ClamAV on apache.org FALCON-169 multiple "/" in target for replication for multi target feed. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b3b42ef9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b3b42ef9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b3b42ef9 Branch: refs/heads/FALCON-85 Commit: b3b42ef94c4c1d750331f89b5b111104a9a95548 Parents: 023f9f4 Author: Venkatesh Seetharam Authored: Thu Nov 7 11:08:23 2013 -0800 Committer: Venkatesh Seetharam Committed: Thu Nov 7 11:08:23 2013 -0800 ---------------------------------------------------------------------- .../apache/falcon/entity/FileSystemStorage.java | 7 +- .../falcon/entity/FileSystemStorageTest.java | 24 +++++-- .../lifecycle/FileSystemFeedReplicationIT.java | 67 +++++++++++++++++- .../table/complex-replicating-feed.xml | 71 ++++++++++++++++++++ .../resources/table/target-cluster-alpha.xml | 2 +- .../resources/table/target-cluster-beta.xml | 2 +- 6 files changed, 161 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 72d9e07..68370c7 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.hadoop.fs.Path; import java.net.URI; import java.net.URISyntaxException; @@ -164,10 +165,8 @@ public class FileSystemStorage implements Storage { return "/tmp"; } - StringBuilder uriTemplate = new StringBuilder(); - uriTemplate.append(storageUrl); - uriTemplate.append(locationForType.getPath()); - return uriTemplate.toString(); + // normalize the path so trailing and double '/' are removed + return storageUrl + new Path(locationForType.getPath()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java index a059652..6917472 100644 --- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java @@ -21,6 +21,7 @@ package org.apache.falcon.entity; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.ArrayList; @@ -135,16 +136,29 @@ public class FileSystemStorageTest { Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar"); } - @Test - public void testGetUriTemplateWithLocationType() throws Exception { + @DataProvider(name = "locationTestDataProvider") + private Object[][] createLocationTestData() { + return new Object[][] { + {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"}, + {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"}, + {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"}, + {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"}, + {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"}, + {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"}, + }; + } + + @Test (dataProvider = "locationTestDataProvider") + public void testGetUriTemplateWithLocationType(String storageUrl, String path, + String expected) throws Exception { final Location location = new Location(); - location.setPath("/foo/bar"); + location.setPath(path); location.setType(LocationType.DATA); List locations = new ArrayList(); locations.add(location); - FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar"); + FileSystemStorage storage = new FileSystemStorage(storageUrl, locations); + Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected); } @Test http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java index f00bdd7..058b35c 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java @@ -152,6 +152,10 @@ public class FileSystemFeedReplicationIT { .accept(MediaType.APPLICATION_JSON) .get(InstancesResult.class); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); + + TestContext.executeWithURL("entity -delete -type feed -name " + feedName); + TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster"); + TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster"); } @Test (enabled = false) @@ -192,7 +196,7 @@ public class FileSystemFeedReplicationIT { FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster())); Assert.assertTrue(beta.exists(new Path("/falcon/test/target-cluster-beta/customer_beta/" + PARTITION_VALUE))); - FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster())); + FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetGammaContext.getCluster().getCluster())); Assert.assertTrue( gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE))); @@ -201,5 +205,66 @@ public class FileSystemFeedReplicationIT { .accept(MediaType.APPLICATION_JSON) .get(InstancesResult.class); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); + + TestContext.executeWithURL("entity -delete -type feed -name " + feedName); + TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster"); + TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha"); + TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta"); + TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-gamma"); + } + + @Test (enabled = false) + public void testFSReplicationComplex() throws Exception { + // copyTestDataToHDFS + Cluster sourceCluster = sourceContext.getCluster().getCluster(); + FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceCluster)); + String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster); + final String partitionValue = "2012-10-01-12"; + String sourceLocation = sourceStorageUrl + SOURCE_LOCATION + partitionValue + "/" + sourceCluster.getColo(); + FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourceLocation); + Path sourcePath = new Path(sourceLocation); + Assert.assertTrue(sourceFS.exists(sourcePath)); + + final Map overlay = sourceContext.getUniqueOverlay(); + String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath)); + + filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath)); + + filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath)); + + // verify if the partition on the source exists - precondition + Assert.assertTrue(sourceFS.exists(sourcePath)); + + filePath = sourceContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath)); + + // wait until the workflow job completes + final String feedName = "complex-replicating-feed"; + WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(), + OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName); + Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED); + + Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + partitionValue))); + + // verify if the partition on the target exists + FileSystem alpha = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster())); + Assert.assertTrue(alpha.exists(new Path("/localDC/rc/billing/ua1/" + partitionValue))); + + FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster())); + Assert.assertTrue(beta.exists(new Path("/localDC/rc/billing/ua2/" + partitionValue))); + + InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName) + .header("Remote-User", "guest") + .accept(MediaType.APPLICATION_JSON) + .get(InstancesResult.class); + Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); + + TestContext.executeWithURL("entity -delete -type feed -name " + feedName); + TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster"); + TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha"); + TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/complex-replicating-feed.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/table/complex-replicating-feed.xml b/webapp/src/test/resources/table/complex-replicating-feed.xml new file mode 100644 index 0000000..35e8986 --- /dev/null +++ b/webapp/src/test/resources/table/complex-replicating-feed.xml @@ -0,0 +1,71 @@ + + + + + + + + + + + + online,bi + + minutes(5) + UTC + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-alpha.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/table/target-cluster-alpha.xml b/webapp/src/test/resources/table/target-cluster-alpha.xml index c62303e..528fd41 100644 --- a/webapp/src/test/resources/table/target-cluster-alpha.xml +++ b/webapp/src/test/resources/table/target-cluster-alpha.xml @@ -19,7 +19,7 @@ - http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-beta.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/table/target-cluster-beta.xml b/webapp/src/test/resources/table/target-cluster-beta.xml index 3738575..d2c808a 100644 --- a/webapp/src/test/resources/table/target-cluster-beta.xml +++ b/webapp/src/test/resources/table/target-cluster-beta.xml @@ -19,7 +19,7 @@ -