falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-278 Changes in feed availability info doesn't update process. Contributed by Shwetha GS
Date Thu, 20 Feb 2014 06:59:46 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master d5b64722c -> 027efd592


FALCON-278 Changes in feed availability info doesn't update process. Contributed by Shwetha
GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/027efd59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/027efd59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/027efd59

Branch: refs/heads/master
Commit: 027efd592fc5e030e74521f9dff4df32b5b7662a
Parents: d5b6472
Author: Shwetha GS <shwethags@gmail.com>
Authored: Thu Feb 20 12:27:59 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Thu Feb 20 12:29:15 2014 +0530

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CHANGES.txt                                     |   2 +
 .../apache/falcon/entity/CatalogStorage.java    |   4 +
 .../apache/falcon/entity/FileSystemStorage.java |   4 +
 .../org/apache/falcon/update/UpdateHelper.java  | 100 ++++++-------------
 .../apache/falcon/update/UpdateHelperTest.java  |  54 +++++-----
 .../workflow/engine/OozieWorkflowEngine.java    |   2 +-
 7 files changed, 69 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 510bcae..f823497 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,4 @@ build
 
 #log files
 logs
+*.log
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26b76b0..67cfede 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,8 @@ Trunk (Unreleased)
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
+    FALCON-278 Changes in feed availability info doesn't update process. (Shwetha GS)
+
     FALCON-239 Build failed on build-tools due to a missing SNAPSHOT. (Srikanth 
     Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index ed9b238..37a05cb 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -325,6 +325,10 @@ public class CatalogStorage implements Storage {
 
     @Override
     public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        if (!(toCompareAgainst instanceof CatalogStorage)) {
+            return false;
+        }
+
         CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
 
         return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 41917c8..4bcf271 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -171,6 +171,10 @@ public class FileSystemStorage implements Storage {
 
     @Override
     public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        if (!(toCompareAgainst instanceof FileSystemStorage)) {
+            return false;
+        }
+
         FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
         final List<Location> fsStorageLocations = fsStorage.getLocations();
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 4580bad..dda6bb3 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -18,20 +18,17 @@
 
 package org.apache.falcon.update;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Partition;
-import org.apache.falcon.entity.v0.feed.Partitions;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,7 +41,8 @@ import org.apache.log4j.Logger;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Helper methods to facilitate entity updates.
@@ -176,83 +174,43 @@ public final class UpdateHelper {
         }
     }
 
-    public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity)
+    public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity,
String cluster)
         throws FalconException {
         if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType()
== EntityType.PROCESS) {
-            return shouldUpdate((Feed) oldEntity, (Feed) newEntity, (Process) affectedEntity);
-        } else {
-            LOG.debug(newEntity.toShortString());
-            LOG.debug(affectedEntity.toShortString());
-            throw new FalconException("Don't know what to do. Unexpected scenario");
-        }
-    }
-
-    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
-        throws FalconException {
-        Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
-        Storage newFeedStorage = FeedHelper.createStorage(newFeed);
 
-        if (!oldFeedStorage.isIdentical(newFeedStorage)) {
-            return true;
-        }
-        LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
+            Feed oldFeed = (Feed) oldEntity;
+            Feed newFeed = (Feed) newEntity;
+            Process affectedProcess = (Process) affectedEntity;
 
-        if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
-            return true;
-        }
-        LOG.debug(oldFeed.toShortString() + ": Frequency identical. Ignoring...");
+            //check if affectedProcess is defined for this cluster
+            if (ProcessHelper.getCluster(affectedProcess, cluster) == null) {
+                LOG.debug("Process " + affectedProcess.getName() + " is not defined for cluster
" + cluster);
+                return false;
+            }
 
-        // it is not possible to have oldFeed partitions as non empty and
-        // new being empty. validator should have gated this.
-        // Also if new partitions are added and old is empty, then there is
-        // nothing
-        // to update in process
-        boolean partitionApplicable = false;
-        Inputs affectedInputs = affectedProcess.getInputs();
-        if (affectedInputs != null && affectedInputs.getInputs() != null) {
-            for (Input input : affectedInputs.getInputs()) {
-                if (input.getFeed().equals(oldFeed.getName())) {
-                    if (input.getPartition() != null && !input.getPartition().isEmpty())
{
-                        partitionApplicable = true;
-                    }
-                }
+            if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
+                LOG.debug(oldFeed.toShortString() + ": Frequency has changed. Updating...");
+                return true;
             }
-            if (partitionApplicable) {
-                LOG.debug("Partitions are applicable. Checking ...");
-                if (newFeed.getPartitions() != null && oldFeed.getPartitions() !=
null) {
-                    List<String> newParts = getPartitions(newFeed.getPartitions());
-                    List<String> oldParts = getPartitions(oldFeed.getPartitions());
-                    if (newParts.size() != oldParts.size()) {
-                        return true;
-                    }
-                    if (!newParts.containsAll(oldParts)) {
-                        return true;
-                    }
-                }
-                LOG.debug(oldFeed.toShortString() + ": Partitions identical. Ignoring...");
+
+            if (!StringUtils.equals(oldFeed.getAvailabilityFlag(), newFeed.getAvailabilityFlag()))
{
+                LOG.debug(oldFeed.toShortString() + ": Availability flag has changed. Updating...");
+                return true;
             }
-        }
 
-        for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
-            oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
-            newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
+            Storage oldFeedStorage = FeedHelper.createStorage(cluster, oldFeed);
+            Storage newFeedStorage = FeedHelper.createStorage(cluster, newFeed);
 
-            if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
-                    .equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
-                    || !oldFeedStorage.isIdentical(newFeedStorage)) {
+            if (!oldFeedStorage.isIdentical(newFeedStorage)) {
+                LOG.debug(oldFeed.toShortString() + ": Storage has changed. Updating...");
                 return true;
             }
-            LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() +
" identical. Ignoring...");
-        }
-
-        return false;
-    }
+            return false;
 
-    private static List<String> getPartitions(Partitions partitions) {
-        List<String> parts = new ArrayList<String>();
-        for (Partition partition : partitions.getPartitions()) {
-            parts.add(partition.getName());
+        } else {
+            LOG.debug(newEntity.toShortString());
+            LOG.debug(affectedEntity.toShortString());
+            throw new FalconException("Don't know what to do. Unexpected scenario");
         }
-        return parts;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 43dcf4d..e532b24 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -161,7 +161,7 @@ public class UpdateHelperTest extends AbstractTestBase {
     }
 
     @Test
-    public void testShouldUpdate2() throws Exception {
+    public void testShouldUpdateProcess() throws Exception {
         Feed oldFeed = parser.parseAndValidate(this.getClass()
                 .getResourceAsStream(FEED_XML));
         String cluster = "testCluster";
@@ -191,60 +191,58 @@ public class UpdateHelperTest extends AbstractTestBase {
     }
 
     @Test
-    public void testShouldUpdate() throws Exception {
-        Feed oldFeed = parser.parseAndValidate(this.getClass()
-                .getResourceAsStream(FEED_XML));
+    public void testShouldUpdateFeed() throws Exception {
+        Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
 
         Feed newFeed = (Feed) oldFeed.copy();
-        Process process = processParser.parseAndValidate(this.getClass().
-                getResourceAsStream(PROCESS_XML));
+        Process process = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
         prepare(process);
-        Process newProcess = (Process) process.copy();
+        String cluster = process.getClusters().getClusters().get(0).getName();
 
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         newFeed.getLateArrival().setCutOff(Frequency.fromString("hours(1)"));
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
-
-        getLocation(newFeed, LocationType.DATA).setPath("/test");
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
-
-        getLocation(newFeed, LocationType.DATA).setPath(
-                getLocation(oldFeed, LocationType.DATA).getPath());
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        getLocation(newFeed, LocationType.DATA, cluster).setPath("/test");
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
+        getLocation(newFeed, LocationType.DATA, cluster).setPath(
+                getLocation(oldFeed, LocationType.DATA, cluster).getPath());
         newFeed.setFrequency(Frequency.fromString("months(1)"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         newFeed.setFrequency(oldFeed.getFrequency());
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
-
         Partition partition = new Partition();
         partition.setName("1");
         newFeed.getPartitions().getPartitions().add(partition);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         Property property = new Property();
         property.setName("1");
         property.setValue("1");
         newFeed.setProperties(new Properties());
         newFeed.getProperties().getProperties().add(property);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         newFeed.getProperties().getProperties().remove(0);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         FeedHelper.getCluster(newFeed, process.getClusters().getClusters().get(0).getName()).getValidity().setStart(
                 SchemaHelper.parseDateUTC("2012-11-01T00:00Z"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         FeedHelper.getCluster(newFeed, process.getClusters().getClusters().get(0).getName()).getValidity().
                 setStart(FeedHelper.getCluster(oldFeed,
                         process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
+
+        //Change location to table should trigger process update
+        newFeed.setLocations(null);
+        CatalogTable table = new CatalogTable();
+        table.setUri("catalog:default:clicks-blah#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
+        newFeed.setTable(table);
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
     }
 
     @Test
@@ -283,7 +281,11 @@ public class UpdateHelperTest extends AbstractTestBase {
         Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
     }
 
-    private static Location getLocation(Feed feed, LocationType type) {
+    private static Location getLocation(Feed feed, LocationType type, String cluster) {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed,
cluster);
+        if (feedCluster.getLocations() != null) {
+            return getLocation(feedCluster.getLocations(), type);
+        }
         return getLocation(feed.getLocations(), type);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/027efd59/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index cea73bd..d1ced99 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -921,7 +921,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             LOG.info("Dependent entities need to be updated " + affectedEntity.toShortString());
-            if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity)) {
+            if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity, cluster))
{
                 continue;
             }
 


Mime
View raw message