falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1483 Add Utils to common to support native scheduler. Contributed by Pallavi Rao.
Date Thu, 24 Sep 2015 06:11:52 GMT
Repository: falcon
Updated Branches:
  refs/heads/master cf492300e -> 6c8b33fd5


FALCON-1483 Add Utils to common to support native scheduler. Contributed by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6c8b33fd
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6c8b33fd
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6c8b33fd

Branch: refs/heads/master
Commit: 6c8b33fd582f05680d8ea56efa784d26fc936428
Parents: cf49230
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Thu Sep 24 11:11:10 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Thu Sep 24 11:11:10 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/EntityUtil.java    | 44 ++++++++++++++++++-
 .../apache/falcon/entity/EntityUtilTest.java    | 45 ++++++++++++++++++++
 3 files changed, 90 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a54fbcf..d8d5a2a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1483 Add Utils to common to support native scheduler(Pallavi Rao via Ajay Yadava)
+
     FALCON-1417 Make validity end date optional for feed / process(Pragya Mittal via Ajay
Yadava)
 
     FALCON-1434 Enhance schedule API to accept key-value properties(Pallavi Rao)   

http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index ad41674..3ab9339 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -47,8 +47,10 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +87,7 @@ public final class EntityUtil {
     private static final long ONE_MS = 1;
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    private static final String STAGING_DIR_NAME_SEPARATOR = "_";
 
     private EntityUtil() {}
 
@@ -357,6 +360,10 @@ public final class EntityUtil {
             return -1;
         }
 
+        if (tz == null) {
+            tz = TimeZone.getTimeZone("UTC");
+        }
+
         Calendar startCal = Calendar.getInstance(tz);
         startCal.setTime(startTime);
 
@@ -657,13 +664,48 @@ public final class EntityUtil {
                 "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" +
entity.getName());
     }
 
+    /**
+     * Gets the latest staging path for an entity on a cluster, based on the dir name(that
contains timestamp).
+     * @param cluster
+     * @param entity
+     * @return
+     * @throws FalconException
+     */
+    public static Path getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster,
final Entity entity)
+        throws FalconException {
+        Path basePath = getBaseStagingPath(cluster, entity);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
+        try {
+            final String md5 = md5(getClusterView(entity, cluster.getName()));
+            FileStatus[] files = fs.listStatus(basePath, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    return path.getName().startsWith(md5);
+                }
+            });
+            if (files != null && files.length != 0) {
+                // Find the latest directory using the timestamp used in the dir name
+                // These files will vary only in ts suffix (as we have filtered out using
a common md5 hash),
+                // hence, sorting will be on timestamp.
+                // FileStatus compares on Path and hence the latest will be at the end after
sorting.
+                Arrays.sort(files);
+                return files[files.length - 1].getPath();
+            }
+            throw new FalconException("No staging directories found for entity " + entity.getName()
+ " on cluster "
+                + cluster.getName());
+        } catch (Exception e) {
+            throw new FalconException("Unable get listing for " + basePath.toString(), e);
+        }
+    }
+
     //Creates new staging path for entity schedule/update
     //Staging path containd md5 of the cluster view of the entity. This is required to check
if update is required
     public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster,
Entity entity)
         throws FalconException {
         Entity clusterView = getClusterView(entity, cluster.getName());
         return new Path(getBaseStagingPath(cluster, entity),
-            md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis()));
+            md5(clusterView) + STAGING_DIR_NAME_SEPARATOR + String.valueOf(System.currentTimeMillis()));
     }
 
     // Given an entity and a cluster, determines if the supplied path is the staging path
for that entity.

http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index d022bae..c87449c 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -38,6 +38,7 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -405,4 +406,48 @@ public class EntityUtilTest extends AbstractTestBase {
             {":value"},
         };
     }
+
+    @Test
+    public void testGetLatestStagingPath() throws FalconException, IOException {
+        ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+        InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster = parser.parse(stream);
+
+        ProcessEntityParser processParser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+        stream = this.getClass().getResourceAsStream(PROCESS_XML);
+        Process process = processParser.parse(stream);
+        process.setName("staging-test");
+
+        String md5 = EntityUtil.md5(EntityUtil.getClusterView(process, "testCluster"));
+        FileSystem fs = HadoopClientFactory.get().
+                createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
+
+        String basePath = "/projects/falcon/staging/falcon/workflows/process/staging-test/";
+        Path[] paths = {
+            new Path(basePath + "5a8100dc460b44db2e7bfab84b24cb92_1436441045003"),
+            new Path(basePath + "6b3a1b6c7cf9de62c78b125415ffb70c_1436504488677"),
+            new Path(basePath + md5 + "_1436344303117"),
+            new Path(basePath + md5 + "_1436347924846"),
+            new Path(basePath + md5 + "_1436357052992"),
+            new Path(basePath + "logs"),
+            new Path(basePath + "random_dir"),
+        };
+
+        // Ensure exception is thrown when there are no staging dirs.
+        fs.delete(new Path(basePath), true);
+        try {
+            EntityUtil.getLatestStagingPath(cluster, process);
+            Assert.fail("Exception expected");
+        } catch (FalconException e) {
+            // Do nothing
+        }
+
+        // Now create paths
+        for (Path path : paths) {
+            fs.create(path);
+        }
+
+        // Ensure latest is returned.
+        Assert.assertEquals(EntityUtil.getLatestStagingPath(cluster, process).getName(),
md5 + "_1436357052992");
+    }
 }


Mime
View raw message