falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [1/2] git commit: FALCON-180 Disable table replication for multiple sources. Contributed by Venkatesh Seetharam FALCON-179 Table replication must drop partition before import as late reruns fails. Contributed by Venkatesh Seetharam FALCON-182 Disable spe
Date Mon, 11 Nov 2013 04:49:22 GMT
Updated Branches:
  refs/heads/FALCON-85 b7e678bb1 -> 382781549


FALCON-180 Disable table replication for multiple sources. Contributed by Venkatesh Seetharam
FALCON-179 Table replication must drop partition before import as late reruns fails. Contributed
by Venkatesh Seetharam
FALCON-182 Disable specifying partitions in inputs with table storage for process. Contributed
by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/4fc7cd6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/4fc7cd6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/4fc7cd6c

Branch: refs/heads/FALCON-85
Commit: 4fc7cd6c7a1ec3e5ea3da50cb36fd10a24e449b7
Parents: 6f0731f
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Mon Nov 11 10:17:10 2013 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Mon Nov 11 10:17:10 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  9 +++
 .../falcon/entity/parser/FeedEntityParser.java  | 43 +++++++++++--
 .../entity/parser/ProcessEntityParser.java      | 18 ++++--
 .../entity/parser/FeedEntityParserTest.java     | 22 +++++++
 .../entity/parser/ProcessEntityParserTest.java  | 14 +++++
 .../feed/table-with-multiple-sources-feed.xml   | 53 ++++++++++++++++
 .../config/workflow/falcon-table-import.hql     |  4 +-
 .../TableStorageFeedReplicationIT.java          | 65 ++++++++++++++++++--
 8 files changed, 213 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff57658..536459a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,15 @@ Trunk (Unreleased)
     Srikanth Sundarrajan)
 
   BUG FIXES
+    FALCON-180 Disable table replication for multiple sources. (Venkatesh
+    Seetharam via Srikanth Sundarrajan)
+
+    FALCON-179 Table replication must drop partition before import as 
+    late reruns fails. (Venkatesh Seetharam via Srikanth Sundarrajan)
+
+    FALCON-182 Disable specifying partitions in inputs with table storage 
+    for process. (Venkatesh Seetharam via Srikanth Sundarrajan)
+
     FALCON-138 Remove perf4j dependency. (Jean-Baptiste Onofré via
     Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 0e687bd..8d7903b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -161,9 +161,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(),
newFeed, clusterName);
                     CrossEntityValidations.validateInstanceRange(process, input, newFeed);
 
-                    if (input.getPartition() != null) {
-                        CrossEntityValidations.validateInputPartition(input, newFeed);
-                    }
+                    validateInputPartition(newFeed, input);
                 }
             }
 
@@ -181,6 +179,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
+    private void validateInputPartition(Feed newFeed, Input input) throws FalconException
{
+        if (input.getPartition() == null) {
+            return;
+        }
+
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            CrossEntityValidations.validateInputPartition(input, newFeed);
+        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+            throw new ValidationException("Input partitions are not supported for table storage:
" + input.getName());
+        }
+    }
+
     private void validateClusterValidity(Date start, Date end, String clusterName) throws
FalconException {
         try {
             if (start.after(end)) {
@@ -288,12 +299,32 @@ public class FeedEntityParser extends EntityParser<Feed> {
      * Does not matter for FileSystem storage.
      */
     private void validateFeedStorage(Feed feed) throws FalconException {
-        final Storage.TYPE storageType = FeedHelper.getStorageType(feed);
-        validateUniformStorageType(feed, storageType);
-        validatePartitions(feed, storageType);
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+        validateMultipleSourcesExist(feed, baseFeedStorageType);
+        validateUniformStorageType(feed, baseFeedStorageType);
+        validatePartitions(feed, baseFeedStorageType);
         validateStorageExists(feed);
     }
 
+    private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType)
throws FalconException {
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            return;
+        }
+
+        // validate that there is only one source cluster
+        int numberOfSourceClusters = 0;
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (cluster.getType() == ClusterType.SOURCE) {
+                numberOfSourceClusters++;
+            }
+        }
+
+        if (numberOfSourceClusters > 1) {
+            throw new ValidationException("Multiple sources are not supported for feed with
table storage: "
+                    + feed.getName());
+        }
+    }
+
     private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws
FalconException {
         for (Cluster cluster : feed.getClusters().getClusters()) {
             Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 81bfe0f..8647d43 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -79,10 +79,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
                     CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
                     CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(),
feed, clusterName);
                     CrossEntityValidations.validateInstanceRange(process, input, feed);
-                    if (input.getPartition() != null) {
-                        CrossEntityValidations.validateInputPartition(input, feed);
-                    }
-
+                    validateInputPartition(input, feed);
                     validateOptionalInputsForTableStorage(feed, input);
                 }
             }
@@ -155,6 +152,19 @@ public class ProcessEntityParser extends EntityParser<Process>
{
         }
     }
 
+    private void validateInputPartition(Input input, Feed feed) throws FalconException {
+        if (input.getPartition() == null) {
+            return;
+        }
+
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            CrossEntityValidations.validateInputPartition(input, feed);
+        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+            throw new ValidationException("Input partitions are not supported for table storage:
" + input.getName());
+        }
+    }
+
     private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException
{
         Set<String> datasetNames = new HashSet<String>();
         if (inputs != null) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 66bdf5c..b90713e 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -458,4 +458,26 @@ public class FeedEntityParserTest extends AbstractTestBase {
     public void testParseInvalidFeedWithTable() throws FalconException {
         parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
     }
+
+    @Test (expectedExceptions = FalconException.class)
+    public void testValidateFeedWithTableAndMultipleSources() throws FalconException {
+        parser.parseAndValidate(FeedEntityParserTest.class.getResourceAsStream(
+                "/config/feed/table-with-multiple-sources-feed.xml"));
+        Assert.fail("Should have thrown an exception:Multiple sources are not supported for
feed with table storage");
+    }
+
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidatePartitionsForTable() throws Exception {
+        Feed feed = parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/hive-table-feed.xml"));
+        Assert.assertNull(feed.getPartitions());
+
+        Partitions partitions = new Partitions();
+        Partition partition = new Partition();
+        partition.setName("colo");
+        partitions.getPartitions().add(partition);
+        feed.setPartitions(partitions);
+
+        parser.validate(feed);
+        Assert.fail("An exception should have been thrown:Partitions are not supported for
feeds with table storage");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 4537bb3..e656772 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -315,4 +315,18 @@ public class ProcessEntityParserTest extends AbstractTestBase {
             Assert.assertTrue(e instanceof ValidationException);
         }
     }
+
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidateInputPartitionForTable() throws Exception {
+        Process process = parser.parse(
+                ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml"));
+        if (process.getInputs() != null) {
+            for (Input input : process.getInputs().getInputs()) {
+                input.setPartition("region=usa");
+            }
+        }
+
+        parser.validate(process);
+        Assert.fail("An exception should have been thrown since Input partitions are not
supported for table storage");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
new file mode 100644
index 0000000..f84f3d4
--- /dev/null
+++ b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="testCluster" type="source">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/feed/src/main/resources/config/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-import.hql b/feed/src/main/resources/config/workflow/falcon-table-import.hql
index 917b5b9..653d580 100644
--- a/feed/src/main/resources/config/workflow/falcon-table-import.hql
+++ b/feed/src/main/resources/config/workflow/falcon-table-import.hql
@@ -15,4 +15,6 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-import table ${falconTargetDatabase}.${falconTargetTable} partition ${falconTargetPartition}
from '${falconTargetStagingDir}';
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
index 256d3b5..dbc6442 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
@@ -119,10 +119,6 @@ public class TableStorageFeedReplicationIT {
 
     @AfterClass
     public void tearDown() throws Exception {
-        TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
-        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
-        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
-
         cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
         cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
 
@@ -176,5 +172,66 @@ public class TableStorageFeedReplicationIT {
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+        TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+    }
+
+    @Test (enabled = false)
+    public void testTableReplicationWithExistingTargetPartition() throws Exception {
+        final String feedName = "customer-table-replicating-feed";
+        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml",
overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file
" + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml",
overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file
" + filePath));
+
+        HCatPartition sourcePartition = HiveTestUtils.getPartition(
+                sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertNotNull(sourcePartition);
+
+        addPartitionToTarget();
+        // verify if the partition on the target exists before replication starts
+        // to see import drops partition before importing partition
+        HCatPartition targetPartition = HiveTestUtils.getPartition(
+                targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertNotNull(targetPartition);
+
+        filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml",
overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type
feed -file " + filePath));
+
+        // wait until the workflow job completes
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+        Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+        // verify if the partition on the target exists
+        targetPartition = HiveTestUtils.getPartition(
+                targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertNotNull(targetPartition);
+
+        InstancesResult response = targetContext.getService().path("api/instance/running/feed/"
+ feedName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+        TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+    }
+
+    private void addPartitionToTarget() throws Exception {
+        final Cluster targetCluster = targetContext.getCluster().getCluster();
+        String targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
+
+        // copyTestDataToHDFS
+        final String targetPath = targetStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+        FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", targetPath);
+
+        HiveTestUtils.loadData(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME,
+                targetPath, PARTITION_VALUE);
     }
 }


Mime
View raw message