falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [1/2] FALCON-206 Process update for wf changes. Contributed by Shwetha GS
Date Mon, 06 Jan 2014 05:51:42 GMT
Updated Branches:
  refs/heads/master 8b479426e -> 2ded62914


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 17227bf..16ef9c2 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -18,13 +18,13 @@
 package org.apache.falcon.replication;
 
 import org.apache.commons.cli.*;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.util.Tool;
@@ -139,8 +139,8 @@ public class FeedReplicator extends Configured implements Tool {
         FileStatus[] files = fs.globStatus(new Path(targetPath.toString() + "/" + fixedPath));
         if (files != null) {
             for (FileStatus file : files) {
-                fs.create(new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME)).close();
-                LOG.info("Created " + new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME));
+                fs.create(new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME)).close();
+                LOG.info("Created " + new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME));
             }
         } else {
             LOG.info("No files present in path: "

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
index 9c97a44..52fcffe 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
@@ -18,9 +18,9 @@
 
 package org.apache.falcon.replication;
 
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.SimpleCopyListing;
@@ -31,7 +31,7 @@ import java.util.regex.Pattern;
 
 /**
  * An implementation of CopyListing that overrides the default behavior by suppressing file,
- * FileOutputCommitter.SUCCEEDED_FILE_NAME and copies that in the last so downstream apps
+ * EntityUtil.SUCCEEDED_FILE_NAME and copies that in the last so downstream apps
  * depending on data availability will work correctly.
  */
 public class FilteredCopyListing extends SimpleCopyListing {
@@ -66,7 +66,7 @@ public class FilteredCopyListing extends SimpleCopyListing {
 
     @Override
     protected boolean shouldCopy(Path path, DistCpOptions options) {
-        if (path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+        if (path.getName().equals(EntityUtil.SUCCEEDED_FILE_NAME)) {
             return false;
         }
         return regex == null || regex.matcher(path.toString()).find();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/src/bin/service-start.sh
----------------------------------------------------------------------
diff --git a/src/bin/service-start.sh b/src/bin/service-start.sh
index 430bb1a..6d06ede 100755
--- a/src/bin/service-start.sh
+++ b/src/bin/service-start.sh
@@ -56,4 +56,4 @@ nohup ${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${FALCONCPPATH} org.apache.falcon.Main
 echo $! > $FALCON_PID_FILE
 popd > /dev/null
 
-echo "$APP_TYPE started using hadoop version: " `${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${FALCONCPPATH}
org.apache.hadoop.util.VersionInfo | head -1`
+echo "$APP_TYPE started using hadoop version: " `${JAVA_BIN} -cp ${FALCONCPPATH} org.apache.hadoop.util.VersionInfo
2> /dev/null | head -1`

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 0b28ddd..58ebd80 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -42,15 +42,6 @@
         </layout>
     </appender>
 
-    <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.tranlog.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %x %m%n"/>
-        </layout>
-    </appender>
-
     <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${falcon.log.dir}/${falcon.app.type}.metric.log"/>
         <param name="Append" value="true"/>
@@ -70,11 +61,6 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
-    <logger name="TRANSACTIONLOG">
-        <level value="info"/>
-        <appender-ref ref="TRANSACTIONLOG"/>
-    </logger>
-
     <logger name="METRIC">
         <level value="info"/>
         <appender-ref ref="METRIC"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 69613f6..79cd211 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -27,7 +27,6 @@
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
-*.journal.impl=org.apache.falcon.transaction.SharedFileSystemJournal
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
 *.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
 *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 6790576..d133b8e 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -45,15 +45,6 @@
         </layout>
     </appender>
 
-    <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/tranlog.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %x %m%n"/>
-        </layout>
-    </appender>
-
     <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/metric.log"/>
         <param name="Append" value="true"/>
@@ -73,11 +64,6 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
-    <logger name="TRANSACTIONLOG">
-        <level value="info"/>
-        <appender-ref ref="TRANSACTIONLOG"/>
-    </logger>
-
     <logger name="METRIC">
         <level value="info"/>
         <appender-ref ref="METRIC"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
index 24c2959..e3cd914 100644
--- a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
+++ b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
@@ -139,9 +139,7 @@ public class LogMoverIT {
     }
 
     private Path getLogPath() throws FalconException {
-        Path stagingPath = new Path(ClusterHelper.getLocation(
-                testCluster.getCluster(), "staging"),
-                EntityUtil.getStagingPath(testProcess) + "/../logs");
+        Path stagingPath = EntityUtil.getLogPath(testCluster.getCluster(), testProcess);
         return new Path(ClusterHelper.getStorageUrl(testCluster
                 .getCluster()), stagingPath);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index cb2fcbb..60fd320 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.xml.bind.JAXBException;
 
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.entity.v0.process.Input;
@@ -63,6 +64,8 @@ import com.sun.jersey.api.client.ClientResponse;
  */
 public class EntityManagerJerseyIT {
 
+    private static final int ONE_HR = 2 * 24 * 60 * 60 * 1000;
+
     @BeforeClass
     public void prepare() throws Exception {
         TestContext.prepare();
@@ -75,6 +78,19 @@ public class EntityManagerJerseyIT {
         Assert.assertTrue(libs[0].getPath().getName().startsWith("falcon-hadoop-dependencies"));
     }
 
+    private Entity getDefinition(TestContext context, EntityType type, String name) throws
Exception {
+        ClientResponse response =
+                context.service.path("api/entities/definition/" + type.name().toLowerCase()
+ "/" + name)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        return (Entity) type.getUnmarshaller().unmarshal(new StringReader(response.getEntity(String.class)));
+    }
+
+    private void updateEndtime(Process process) {
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        processValidity.setEnd(new Date(new Date().getTime() + ONE_HR));
+    }
+
     @Test
     public void testLibExtensions() throws Exception {
         TestContext context = newContext();
@@ -100,14 +116,23 @@ public class EntityManagerJerseyIT {
         context.assertSuccessful(response);
     }
 
+    private void update(TestContext context, Entity entity) throws Exception {
+        File tmpFile = context.getTempFile();
+        entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
+        ClientResponse response = context.service.path("api/entities/update/"
+                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName())
+                .header("Remote-User", TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+        context.assertSuccessful(response);
+    }
+
     @Test
     public void testUpdateCheckUser() throws Exception {
         TestContext context = newContext();
         Map<String, String> overlay = context.getUniqueOverlay();
         String tmpFileName = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE,
overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
+        updateEndtime(process);
         File tmpFile = context.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
@@ -117,22 +142,11 @@ public class EntityManagerJerseyIT {
         Assert.assertEquals(bundles.size(), 1);
         Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
 
-        ClientResponse response = context.service.path("api/entities/definition/feed/"
-                + context.outputFeedName).header(
-                "Remote-User", TestContext.REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Feed feed = (Feed) EntityType.FEED.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
+        Feed feed = (Feed) getDefinition(context, EntityType.FEED, context.outputFeedName);
 
         //change output feed path and update feed as another user
         feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
-        tmpFile = context.getTempFile();
-        EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
-        response = context.service.path("api/entities/update/feed/"
-                + context.outputFeedName).header("Remote-User",
-                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-        context.assertSuccessful(response);
+        update(context, feed);
 
         bundles = context.getBundles();
         Assert.assertEquals(bundles.size(), 2);
@@ -217,6 +231,27 @@ public class EntityManagerJerseyIT {
     }
 
     @Test
+    public void testUserWorkflowUpdate() throws Exception {
+        //schedule a process
+        TestContext context = newContext();
+        context.scheduleProcess();
+        context.waitForBundleStart(Job.Status.RUNNING);
+        List<BundleJob> bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 1);
+
+        //create new file in user workflow
+        FileSystem fs = context.cluster.getFileSystem();
+        fs.create(new Path("/falcon/test/workflow", "newfile")).close();
+
+        //update process should create new bundle
+        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        updateEndtime(process);
+        update(context, process);
+        bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 2);
+    }
+
+    @Test
     public void testProcessInputUpdate() throws Exception {
         TestContext context = newContext();
         context.scheduleProcess();
@@ -226,18 +261,12 @@ public class EntityManagerJerseyIT {
         OozieClient ozClient = context.getOozieClient();
         String coordId = ozClient.getBundleJobInfo(bundles.get(0).getId()).getCoordinators().get(0).getId();
 
-        ClientResponse response = context.service.path("api/entities/definition/process/"
-                + context.processName).header(
-                "Remote-User", TestContext.REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
-
+        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
         String feed3 = "f3" + System.currentTimeMillis();
         Map<String, String> overlay = new HashMap<String, String>();
         overlay.put("inputFeedName", feed3);
         overlay.put("cluster", context.clusterName);
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay,
EntityType.FEED);
         context.assertSuccessful(response);
 
         Input input = new Input();
@@ -247,15 +276,8 @@ public class EntityManagerJerseyIT {
         input.setEnd("today(20,20)");
         process.getInputs().getInputs().add(input);
 
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
-        File tmpFile = context.getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        response = context.service.path("api/entities/update/process/"
-                + context.processName).header("Remote-User",
-                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-        context.assertSuccessful(response);
+        updateEndtime(process);
+        update(context, process);
 
         //Assert that update creates new bundle and old coord is running
         bundles = context.getBundles();
@@ -269,21 +291,9 @@ public class EntityManagerJerseyIT {
         context.scheduleProcess();
         context.waitForBundleStart(Job.Status.RUNNING);
 
-        ClientResponse response = context.service.path("api/entities/definition/process/"
-                + context.processName).header(
-                "Remote-User", TestContext.REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
-
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 60 * 60 * 1000));
-        File tmpFile = context.getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        response = context.service.path("api/entities/update/process/" + context.processName).header("Remote-User",
-                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-        context.assertSuccessful(response);
+        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        updateEndtime(process);
+        update(context, process);
 
         //Assert that update does not create new bundle
         List<BundleJob> bundles = context.getBundles();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index d224f90..3fcd5dc 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -414,6 +414,18 @@ public class TestContext {
         prepare(TestContext.CLUSTER_TEMPLATE);
     }
 
+    private static void mkdir(FileSystem fs, Path path) throws Exception {
+        if (!fs.exists(path) && !fs.mkdirs(path)) {
+            throw new Exception("mkdir failed for " + path);
+        }
+    }
+
+    private static void mkdir(FileSystem fs, Path path, FsPermission perm) throws Exception
{
+        if (!fs.exists(path) && !fs.mkdirs(path, perm)) {
+            throw new Exception("mkdir failed for " + path);
+        }
+    }
+
     public static void prepare(String clusterTemplate) throws Exception {
         // setup a logged in user
         CurrentUser.authenticate(REMOTE_USER);
@@ -429,19 +441,18 @@ public class TestContext {
 
         // setup dependent workflow and lipath in hdfs
         FileSystem fs = FileSystem.get(cluster.getConf());
-        fs.mkdirs(new Path("/falcon"), new FsPermission((short) 511));
+        mkdir(fs, new Path("/falcon"), new FsPermission((short) 511));
 
         Path wfParent = new Path("/falcon/test");
         fs.delete(wfParent, true);
         Path wfPath = new Path(wfParent, "workflow");
-        fs.mkdirs(wfPath);
+        mkdir(fs, wfPath);
         fs.copyFromLocalFile(false, true,
                 new Path(TestContext.class.getResource("/fs-workflow.xml").getPath()),
                 new Path(wfPath, "workflow.xml"));
-        fs.mkdirs(new Path(wfParent, "input/2012/04/20/00"));
+        mkdir(fs, new Path(wfParent, "input/2012/04/20/00"));
         Path outPath = new Path(wfParent, "output");
-        fs.mkdirs(outPath);
-        fs.setPermission(outPath, new FsPermission((short) 511));
+        mkdir(fs, outPath, new FsPermission((short) 511));
     }
 
     public static void cleanupStore() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
index 690fb6b..3769dde 100644
--- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
@@ -67,9 +67,7 @@ public final class OozieTestUtils {
     }
 
     public static Path getOozieLogPath(Cluster cluster, WorkflowJob jobInfo) throws Exception
{
-
-        Path stagingPath = new Path(ClusterHelper.getLocation(cluster, "staging"),
-                EntityUtil.getStagingPath(cluster) + "/../logs");
+        Path stagingPath = EntityUtil.getLogPath(cluster, cluster);
         final Path logPath = new Path(ClusterHelper.getStorageUrl(cluster), stagingPath);
         LogMover.main(new String[] {
             "-workflowEngineUrl", ClusterHelper.getOozieUrl(cluster),


Mime
View raw message