falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [14/47] Fixes for Checkstyle
Date Fri, 26 Apr 2013 15:50:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 1b4a93d..3a7f4ae 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -18,7 +18,6 @@
 package org.apache.falcon.converter;
 
 import junit.framework.Assert;
-import org.apache.hadoop.fs.Path;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.ClusterHelper;
@@ -31,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.hadoop.fs.Path;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -42,96 +42,97 @@ import java.util.List;
 import static org.testng.Assert.assertEquals;
 
 public class OozieFeedMapperTest {
-	private EmbeddedCluster srcMiniDFS;
-	private EmbeddedCluster trgMiniDFS;
-	ConfigurationStore store = ConfigurationStore.get();
-	Cluster srcCluster;
-	Cluster trgCluster;
-	Feed feed;
+    private EmbeddedCluster srcMiniDFS;
+    private EmbeddedCluster trgMiniDFS;
+    ConfigurationStore store = ConfigurationStore.get();
+    Cluster srcCluster;
+    Cluster trgCluster;
+    Feed feed;
 
-	private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
-	private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
-	private static final String FEED = "/feed.xml";
+    private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
+    private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
+    private static final String FEED = "/feed.xml";
 
-	@BeforeClass
-	public void setUpDFS() throws Exception {
-		srcMiniDFS = EmbeddedCluster.newCluster("cluster1", false);
-		String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        srcMiniDFS = EmbeddedCluster.newCluster("cluster1", false);
+        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
 
-		trgMiniDFS = EmbeddedCluster.newCluster("cluster2", false);
-		String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
+        trgMiniDFS = EmbeddedCluster.newCluster("cluster2", false);
+        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
 
-		cleanupStore();
+        cleanupStore();
 
-		srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH);
-		ClusterHelper.getInterface(srcCluster, Interfacetype.WRITE).setEndpoint(srcHdfsUrl);
+        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH);
+        ClusterHelper.getInterface(srcCluster, Interfacetype.WRITE).setEndpoint(srcHdfsUrl);
 
-		trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH);
+        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH);
         ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
 
-		feed = (Feed) storeEntity(EntityType.FEED, FEED);
-
-	}
-
-	protected Entity storeEntity(EntityType type, String path) throws Exception {
-		Unmarshaller unmarshaller = type.getUnmarshaller();
-		Entity entity = (Entity) unmarshaller
-				.unmarshal(OozieFeedMapperTest.class.getResource(path));
-		store.publish(type, entity);
-		return entity;
-	}
-
-	protected void cleanupStore() throws FalconException {
-		for (EntityType type : EntityType.values()) {
-			Collection<String> entities = store.getEntities(type);
-			for (String entity : entities)
-				store.remove(type, entity);
-		}
-	}
-
-	@AfterClass
-	public void stopDFS() {
-		srcMiniDFS.shutdown();
-		trgMiniDFS.shutdown();
-	}
-
-	@Test
-	public void testReplicationCoords() throws FalconException {
-		OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-		List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
-				new Path("/projects/falcon/"));
-		COORDINATORAPP coord = coords.get(0);
-
-		Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
-				.getAction().getWorkflow().getAppPath());
-		Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
-				+ srcCluster.getName(), coord.getName());
-		Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-		SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
-				.getDatasetOrAsyncDataset().get(0);
-		SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
-				.getDatasetOrAsyncDataset().get(1);
-
-		Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
-		Assert.assertEquals("input-dataset", inputDataset.getName());
-		Assert.assertEquals(
-				ClusterHelper.getStorageUrl(srcCluster)
-						+ "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-				inputDataset.getUriTemplate());
-
-		Assert.assertEquals("${coord:minutes(20)}",
-				outputDataset.getFrequency());
-		Assert.assertEquals("output-dataset", outputDataset.getName());
-		Assert.assertEquals(
-				"${nameNode}"
-						+ "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-				outputDataset.getUriTemplate());
-        for(Property prop:coord.getAction().getWorkflow().getConfiguration().getProperty()){
-        	if(prop.getName().equals("mapred.job.priority")){
-        		assertEquals(prop.getValue(), "NORMAL");
-        		break;
-        	}
+        feed = (Feed) storeEntity(EntityType.FEED, FEED);
+
+    }
+
+    protected Entity storeEntity(EntityType type, String path) throws Exception {
+        Unmarshaller unmarshaller = type.getUnmarshaller();
+        Entity entity = (Entity) unmarshaller
+                .unmarshal(OozieFeedMapperTest.class.getResource(path));
+        store.publish(type, entity);
+        return entity;
+    }
+
+    protected void cleanupStore() throws FalconException {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+
+    @AfterClass
+    public void stopDFS() {
+        srcMiniDFS.shutdown();
+        trgMiniDFS.shutdown();
+    }
+
+    @Test
+    public void testReplicationCoords() throws FalconException {
+        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
+        List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
+                new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
+                .getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+        Assert.assertEquals(
+                ClusterHelper.getStorageUrl(srcCluster)
+                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                inputDataset.getUriTemplate());
+
+        Assert.assertEquals("${coord:minutes(20)}",
+                outputDataset.getFrequency());
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+        Assert.assertEquals(
+                "${nameNode}"
+                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                outputDataset.getUriTemplate());
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            if (prop.getName().equals("mapred.job.priority")) {
+                assertEquals(prop.getValue(), "NORMAL");
+                break;
+            }
         }
 
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/feed/src/test/resources/feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/feed.xml b/feed/src/test/resources/feed.xml
index aac717c..d5948b0 100644
--- a/feed/src/test/resources/feed.xml
+++ b/feed/src/test/resources/feed.xml
@@ -16,39 +16,41 @@
   limitations under the License.
   -->
 <feed description="clicks log" name="raw-logs" xmlns="uri:falcon:feed:0.1"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+        >
 
-	<groups>online,bi</groups>
+    <groups>online,bi</groups>
 
-	<frequency>minutes(20)</frequency>
+    <frequency>minutes(20)</frequency>
     <timezone>UTC</timezone>
-    
-	<late-arrival cut-off="minutes(3)" />
-	<clusters>
-		<cluster name="corp1" type="source">
-			<validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
-			<retention limit="minutes(5)" action="delete" /> <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-		</cluster>
-		<cluster name="corp2" type="target">
-			<validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
-			<retention limit="minutes(7)" action="delete" /> <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-		</cluster>
-	</clusters>
-
-	<locations>
-		<location type="data"
-			path="/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}" />
-		<location type="stats" path="/projects/falcon/clicksStats" />
-		<location type="meta" path="/projects/falcon/clicksMetaData" />
-	</locations>
-
-	<ACL owner="testuser" group="group" permission="0x755" />
-	<schema location="/schema/clicks" provider="protobuf" />
-
-	<properties>
-		<property name="field3" value="value3" />
-		<property name="field2" value="value2" />
-
-		<property name="field4" value="value2" />
-	</properties>
+
+    <late-arrival cut-off="minutes(3)"/>
+    <clusters>
+        <cluster name="corp1" type="source">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(5)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="corp2" type="target">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(7)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data"
+                  path="/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field3" value="value3"/>
+        <property name="field2" value="value2"/>
+
+        <property name="field4" value="value2"/>
+    </properties>
 </feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/feed/src/test/resources/src-cluster.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/src-cluster.xml b/feed/src/test/resources/src-cluster.xml
index 0567c34..75d8ed0 100644
--- a/feed/src/test/resources/src-cluster.xml
+++ b/feed/src/test/resources/src-cluster.xml
@@ -16,25 +16,25 @@
   limitations under the License.
   -->
 <cluster colo="gs1" description="" name="corp1" xmlns="uri:falcon:cluster:0.1"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-	<interfaces>
-		<interface type="readonly" endpoint="http://localhost:50070"
-			version="0.20.2" />
-		<interface type="write" endpoint="hdfs://localhost:8020"
-			version="0.20.2" />
-		<interface type="execute" endpoint="localhost:8021" version="0.20.2" />
-		<interface type="workflow" endpoint="http://localhost:11000/oozie/"
-			version="3.1" />
-		<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
-			version="5.1.6" />
-		<interface type="registry" endpoint="Hcat" version="1" />
-	</interfaces>
-	<locations>
-		<location name="temp" path="/tmp" />
-		<location name="working" path="/projects/falcon/working" />
-		<location name="staging" path="/projects/falcon/staging" />
-	</locations>
-	<properties>
-		<property name="separator" value="-" />
-	</properties>
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
 </cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/feed/src/test/resources/trg-cluster.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/trg-cluster.xml b/feed/src/test/resources/trg-cluster.xml
index d05c3f2..9a99b62 100644
--- a/feed/src/test/resources/trg-cluster.xml
+++ b/feed/src/test/resources/trg-cluster.xml
@@ -16,25 +16,25 @@
   limitations under the License.
   -->
 <cluster colo="gs2" description="" name="corp2" xmlns="uri:falcon:cluster:0.1"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-	<interfaces>
-		<interface type="readonly" endpoint="http://localhost:50070"
-			version="0.20.2" />
-		<interface type="write" endpoint="hdfs://localhost:8020"
-			version="0.20.2" />
-		<interface type="execute" endpoint="localhost:8021" version="0.20.2" />
-		<interface type="workflow" endpoint="http://localhost:11000/oozie/"
-			version="3.1" />
-		<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
-			version="5.1.6" />
-		<interface type="registry" endpoint="Hcat" version="1" />
-	</interfaces>
-	<locations>
-		<location name="temp" path="/tmp" />
-		<location name="working" path="/projects/falcon/working" />
-		<location name="staging" path="/projects/falcon/staging2" />
-	</locations>
-	<properties>
-		<property name="separator" value="-" />
-	</properties>
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
 </cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index 6112e98..f6b90df 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -18,6 +18,13 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,200 +35,190 @@ import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.log4j.Logger;
-
 /**
  * Value Object which is stored in JMS Topic as MapMessage
- * 
  */
 public class EntityInstanceMessage {
 
-	private final Map<ARG, String> keyValueMap = new LinkedHashMap<ARG, String>();
-	private static final Logger LOG = Logger
-			.getLogger(EntityInstanceMessage.class);
-	private static final String FALCON_ENTITY_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
-
-	public enum EntityOps {
-		GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
-	}
-
-	public enum ARG {
-		entityName("entityName"), feedNames("feedNames"), feedInstancePaths(
-				"feedInstancePaths"), workflowId("workflowId"), runId("runId"), nominalTime(
-				"nominalTime"), timeStamp("timeStamp"), brokerUrl("broker.url"), brokerImplClass(
-				"broker.impl.class"), entityType("entityType"), operation(
-				"operation"), logFile("logFile"), topicName("topicName"), status(
-				"status"), brokerTTL("broker.ttlInMins"),cluster("cluster");
-
-		private String propName;
-
-		private ARG(String propName) {
-			this.propName = propName;
-		}
-
-		/**
-		 * 
-		 * @return Name of the Argument used in the parent workflow to pass
-		 *         arguments to MessageProducer Main class.
-		 */
-		public String getArgName() {
-			return this.name();
-		}
-
-		/**
-		 * 
-		 * @return Name of the property used in the startup.properties,
-		 *         coordinator and parent workflow.
-		 */
-		public String getPropName() {
-			return this.propName;
-		}
-	}
-
-	public Map<ARG, String> getKeyValueMap() {
-		return this.keyValueMap;
-	}
-
-	public String getTopicName() {
-		return this.keyValueMap.get(ARG.topicName);
-	}
-
-	public String getFeedName() {
-		return this.keyValueMap.get(ARG.feedNames);
-	}
-
-	public void setFeedName(String feedName) {
-		this.keyValueMap.remove(ARG.feedNames);
-		this.keyValueMap.put(ARG.feedNames, feedName);
-	}
-
-	public String getFeedInstancePath() {
-		return this.keyValueMap.get(ARG.feedInstancePaths);
-	}
-
-	public void setFeedInstancePath(String feedInstancePath) {
-		this.keyValueMap.remove(ARG.feedInstancePaths);
-		this.keyValueMap.put(ARG.feedInstancePaths, feedInstancePath);
-	}
-
-	public String getEntityType() {
-		return this.keyValueMap.get(ARG.entityType);
-	}
-
-	public String getBrokerTTL() {
-		return this.keyValueMap.get(ARG.brokerTTL);
-	}
-
-	public void convertDateFormat() throws ParseException {
-		String date = this.keyValueMap.remove(ARG.nominalTime);
-		this.keyValueMap.put(ARG.nominalTime, getFalconDate(date));
-		date = this.keyValueMap.remove(ARG.timeStamp);
-		this.keyValueMap.put(ARG.timeStamp, getFalconDate(date));
-	}
-
-	public static EntityInstanceMessage[] getMessages(CommandLine cmd)
-			throws ParseException {
-		String[] feedNames = getFeedNames(cmd);
-		if(feedNames == null){
-		    return null;
-		}
-		
-		String[] feedPaths;
-		try {
-			feedPaths = getFeedPaths(cmd);
-		} catch (IOException e) {
-			LOG.error("Error getting instance paths: ", e);
-			throw new RuntimeException(e);
-		}
-
-		EntityInstanceMessage[] messages = new EntityInstanceMessage[feedPaths.length];
-		for (int i = 0; i < feedPaths.length; i++) {
-			EntityInstanceMessage message = new EntityInstanceMessage();
-			setDefaultValues(cmd, message);
-			// override default values
-			if (message.getEntityType().equalsIgnoreCase("PROCESS")) {
-				message.setFeedName(feedNames[i]);
-			} else {
-				message.setFeedName(message.getFeedName());
-			}
-			message.setFeedInstancePath(feedPaths[i]);
-			message.convertDateFormat();
-			messages[i] = message;
-		}
-
-		return messages;
-	}
-
-	private static void setDefaultValues(CommandLine cmd,
-			EntityInstanceMessage message) {
-		for (ARG arg : ARG.values()) {
-			message.keyValueMap.put(arg, cmd.getOptionValue(arg.name()));
-		}
-	}
-
-	private static String[] getFeedNames(CommandLine cmd) {
-		String feedNameStr = cmd.getOptionValue(ARG.feedNames.getArgName());
+    private final Map<ARG, String> keyValueMap = new LinkedHashMap<ARG, String>();
+    private static final Logger LOG = Logger
+            .getLogger(EntityInstanceMessage.class);
+    private static final String FALCON_ENTITY_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+
+    public enum EntityOps {
+        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
+    }
+
+    public enum ARG {
+        entityName("entityName"), feedNames("feedNames"), feedInstancePaths(
+                "feedInstancePaths"), workflowId("workflowId"), runId("runId"), nominalTime(
+                "nominalTime"), timeStamp("timeStamp"), brokerUrl("broker.url"), brokerImplClass(
+                "broker.impl.class"), entityType("entityType"), operation(
+                "operation"), logFile("logFile"), topicName("topicName"), status(
+                "status"), brokerTTL("broker.ttlInMins"), cluster("cluster");
+
+        private String propName;
+
+        private ARG(String propName) {
+            this.propName = propName;
+        }
+
+        /**
+         * @return Name of the Argument used in the parent workflow to pass
+         *         arguments to MessageProducer Main class.
+         */
+        public String getArgName() {
+            return this.name();
+        }
+
+        /**
+         * @return Name of the property used in the startup.properties,
+         *         coordinator and parent workflow.
+         */
+        public String getPropName() {
+            return this.propName;
+        }
+    }
+
+    public Map<ARG, String> getKeyValueMap() {
+        return this.keyValueMap;
+    }
+
+    public String getTopicName() {
+        return this.keyValueMap.get(ARG.topicName);
+    }
+
+    public String getFeedName() {
+        return this.keyValueMap.get(ARG.feedNames);
+    }
+
+    public void setFeedName(String feedName) {
+        this.keyValueMap.remove(ARG.feedNames);
+        this.keyValueMap.put(ARG.feedNames, feedName);
+    }
+
+    public String getFeedInstancePath() {
+        return this.keyValueMap.get(ARG.feedInstancePaths);
+    }
+
+    public void setFeedInstancePath(String feedInstancePath) {
+        this.keyValueMap.remove(ARG.feedInstancePaths);
+        this.keyValueMap.put(ARG.feedInstancePaths, feedInstancePath);
+    }
+
+    public String getEntityType() {
+        return this.keyValueMap.get(ARG.entityType);
+    }
+
+    public String getBrokerTTL() {
+        return this.keyValueMap.get(ARG.brokerTTL);
+    }
+
+    public void convertDateFormat() throws ParseException {
+        String date = this.keyValueMap.remove(ARG.nominalTime);
+        this.keyValueMap.put(ARG.nominalTime, getFalconDate(date));
+        date = this.keyValueMap.remove(ARG.timeStamp);
+        this.keyValueMap.put(ARG.timeStamp, getFalconDate(date));
+    }
+
+    public static EntityInstanceMessage[] getMessages(CommandLine cmd)
+            throws ParseException {
+        String[] feedNames = getFeedNames(cmd);
+        if (feedNames == null) {
+            return null;
+        }
+
+        String[] feedPaths;
+        try {
+            feedPaths = getFeedPaths(cmd);
+        } catch (IOException e) {
+            LOG.error("Error getting instance paths: ", e);
+            throw new RuntimeException(e);
+        }
+
+        EntityInstanceMessage[] messages = new EntityInstanceMessage[feedPaths.length];
+        for (int i = 0; i < feedPaths.length; i++) {
+            EntityInstanceMessage message = new EntityInstanceMessage();
+            setDefaultValues(cmd, message);
+            // override default values
+            if (message.getEntityType().equalsIgnoreCase("PROCESS")) {
+                message.setFeedName(feedNames[i]);
+            } else {
+                message.setFeedName(message.getFeedName());
+            }
+            message.setFeedInstancePath(feedPaths[i]);
+            message.convertDateFormat();
+            messages[i] = message;
+        }
+
+        return messages;
+    }
+
+    private static void setDefaultValues(CommandLine cmd,
+                                         EntityInstanceMessage message) {
+        for (ARG arg : ARG.values()) {
+            message.keyValueMap.put(arg, cmd.getOptionValue(arg.name()));
+        }
+    }
+
+    private static String[] getFeedNames(CommandLine cmd) {
+        String feedNameStr = cmd.getOptionValue(ARG.feedNames.getArgName());
+        String topicName = cmd.getOptionValue(ARG.topicName.getArgName());
+        if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
+            return new String[]{feedNameStr};
+        }
+        if (feedNameStr.equals("null")) {
+            return null;
+        }
+
+        return feedNameStr.split(",");
+    }
+
+    private static String[] getFeedPaths(CommandLine cmd) throws IOException {
         String topicName = cmd.getOptionValue(ARG.topicName.getArgName());
-		if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
-			return new String[] { feedNameStr };
-		}
-		if (feedNameStr.equals("null")) {
-			return null;
-		}
-		
-		return feedNameStr.split(",");
-	}
-
-	private static String[] getFeedPaths(CommandLine cmd) throws IOException {
-		String topicName = cmd.getOptionValue(ARG.topicName.getArgName());
-		String operation = cmd.getOptionValue(ARG.operation.getArgName());
-
-		if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
-			LOG.debug("Returning instance paths for Falcon Topic: "
-					+ cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
-			return new String[] { cmd.getOptionValue(ARG.feedInstancePaths
-					.getArgName()) };
-		}
-
-		if (operation.equals(EntityOps.GENERATE.name())
-				|| operation.equals(EntityOps.REPLICATE.name())) {
-			LOG.debug("Returning instance paths: "
-					+ cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
-			return cmd.getOptionValue(ARG.feedInstancePaths.getArgName())
-					.split(",");
-		}
-		//else case of feed retention
-		Path logFile = new Path(cmd.getOptionValue(ARG.logFile.getArgName()));
-		FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
-		ByteArrayOutputStream writer = new ByteArrayOutputStream();
-		InputStream instance = fs.open(logFile);
-		IOUtils.copyBytes(instance, writer, 4096, true);
-		String[] instancePaths = writer.toString().split("=");
-		fs.delete(logFile, true);
-		LOG.info("Deleted feed instance paths file:"+logFile);
-		if (instancePaths.length == 1) {
-			LOG.debug("Returning 0 instance paths for feed ");
-			return new String[0];
-		} else {
-			LOG.debug("Returning instance paths for feed " + instancePaths[1]);
-			return instancePaths[1].split(",");
-		}
-
-	}
-
-	public String getFalconDate(String nominalTime) throws ParseException {
-		DateFormat nominalFormat = new SimpleDateFormat(
-				"yyyy'-'MM'-'dd'-'HH'-'mm");
-		Date nominalDate = nominalFormat.parse(nominalTime);
-		DateFormat falconFormat = new SimpleDateFormat(
-				"yyyy'-'MM'-'dd'T'HH':'mm'Z'");
-		return falconFormat.format(nominalDate);
-
-	}
+        String operation = cmd.getOptionValue(ARG.operation.getArgName());
+
+        if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
+            LOG.debug("Returning instance paths for Falcon Topic: "
+                    + cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
+            return new String[]{cmd.getOptionValue(ARG.feedInstancePaths
+                    .getArgName())};
+        }
+
+        if (operation.equals(EntityOps.GENERATE.name())
+                || operation.equals(EntityOps.REPLICATE.name())) {
+            LOG.debug("Returning instance paths: "
+                    + cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
+            return cmd.getOptionValue(ARG.feedInstancePaths.getArgName())
+                    .split(",");
+        }
+        //else case of feed retention
+        Path logFile = new Path(cmd.getOptionValue(ARG.logFile.getArgName()));
+        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(logFile);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        String[] instancePaths = writer.toString().split("=");
+        fs.delete(logFile, true);
+        LOG.info("Deleted feed instance paths file:" + logFile);
+        if (instancePaths.length == 1) {
+            LOG.debug("Returning 0 instance paths for feed ");
+            return new String[0];
+        } else {
+            LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+            return instancePaths[1].split(",");
+        }
+
+    }
+
+    public String getFalconDate(String nominalTime) throws ParseException {
+        DateFormat nominalFormat = new SimpleDateFormat(
+                "yyyy'-'MM'-'dd'-'HH'-'mm");
+        Date nominalDate = nominalFormat.parse(nominalTime);
+        DateFormat falconFormat = new SimpleDateFormat(
+                "yyyy'-'MM'-'dd'T'HH':'mm'Z'");
+        return falconFormat.format(nominalDate);
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
index 867eb44..0a8e7df 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
@@ -18,42 +18,41 @@
 
 package org.apache.falcon.messaging;
 
-import java.util.Map.Entry;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
-
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import java.util.Map.Entry;
 
 /**
  * Falcon JMS message creator- creates JMS TextMessage
  */
 public class EntityInstanceMessageCreator {
 
-	private MapMessage mapMessage;
+    private MapMessage mapMessage;
 
-	private final EntityInstanceMessage instanceMessage;
+    private final EntityInstanceMessage instanceMessage;
 
-	public EntityInstanceMessageCreator(EntityInstanceMessage instanceMessage) {
-		this.instanceMessage = instanceMessage;
-	}
+    public EntityInstanceMessageCreator(EntityInstanceMessage instanceMessage) {
+        this.instanceMessage = instanceMessage;
+    }
 
-	public Message createMessage(Session session) throws JMSException {
-		mapMessage = session.createMapMessage();
-		for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap()
-				.entrySet()) {
-			mapMessage.setString(entry.getKey().getArgName(), instanceMessage
-					.getKeyValueMap().get(entry.getKey()));
-		}
-		return mapMessage;
+    public Message createMessage(Session session) throws JMSException {
+        mapMessage = session.createMapMessage();
+        for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap()
+                .entrySet()) {
+            mapMessage.setString(entry.getKey().getArgName(), instanceMessage
+                    .getKeyValueMap().get(entry.getKey()));
+        }
+        return mapMessage;
 
-	}
+    }
 
-	@Override
-	public String toString() {
-		return this.mapMessage.toString();
-	}
+    @Override
+    public String toString() {
+        return this.mapMessage.toString();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
index dbbf70d..cb0ad8a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
@@ -18,167 +18,160 @@
 
 package org.apache.falcon.messaging;
 
-import java.lang.reflect.InvocationTargetException;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.*;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.log4j.Logger;
 
+import javax.jms.*;
+import java.lang.reflect.InvocationTargetException;
+
 public class MessageProducer extends Configured implements Tool {
 
-	private Connection connection;
-	private static final Logger LOG = Logger.getLogger(MessageProducer.class);
-	private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
-
-	/**
-	 * 
-	 * @param arguments
-	 *            - Accepts a Message to be send to JMS topic, creates a new
-	 *            Topic based on topic name if it does not exist or else
-	 *            existing topic with the same name is used to send the message.
-	 * @throws JMSException
-	 */
-	protected void sendMessage(EntityInstanceMessage entityInstanceMessage)
-			throws JMSException {
-
-		Session session = connection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		Topic entityTopic = session.createTopic(entityInstanceMessage
-				.getTopicName());
-		javax.jms.MessageProducer producer = session
-				.createProducer(entityTopic);
-		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-		long messageTTL = DEFAULT_TTL;
-		try {
-			long messageTTLinMins = Long.parseLong(entityInstanceMessage
-					.getBrokerTTL());
-			messageTTL = messageTTLinMins * 60 * 1000;
-		} catch (NumberFormatException e) {
-			LOG.error("Error in parsing broker.ttl, setting TTL to:"
-					+ DEFAULT_TTL + " milli-seconds");
-		}
-		producer.setTimeToLive(messageTTL);
-		producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
-				.createMessage(session));
-
-	}
-
-	public static void main(String[] args) throws Exception {
-		ToolRunner.run(new MessageProducer(), args);
-	}
-
-	private void createAndStartConnection(String implementation,
-			String userName, String password, String url) throws JMSException,
-			ClassNotFoundException, IllegalArgumentException,
-			SecurityException, InstantiationException, IllegalAccessException,
-			InvocationTargetException, NoSuchMethodException {
-
-		Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) MessageProducer.class
-				.getClassLoader().loadClass(implementation);
-
-		ConnectionFactory connectionFactory = clazz.getConstructor(
-				String.class, String.class, String.class).newInstance(userName,
-				password, url);
-
-		connection = connectionFactory.createConnection();
-		connection.start();
-	}
-
-	private static CommandLine getCommand(String[] arguments)
-			throws ParseException {
-		Options options = new Options();
-		addOption(options, new Option(ARG.brokerImplClass.getArgName(), true,
-				"message broker Implementation class"));
-		addOption(options, new Option(ARG.brokerTTL.getArgName(), true,
-				"message time-to-live"));
-		addOption(options, new Option(ARG.brokerUrl.getArgName(), true,
-				"message broker url"));
-		addOption(options, new Option(ARG.entityName.getArgName(), true,
-				"name of the entity"));
-		addOption(options, new Option(ARG.entityType.getArgName(), true,
-				"type of the entity"));
-		addOption(options, new Option(ARG.feedInstancePaths.getArgName(),
-				true, "feed instance paths"));
-		addOption(options, new Option(ARG.feedNames.getArgName(), true,
-				"feed names"));
-		addOption(options, new Option(ARG.logFile.getArgName(), true,
-				"log file path"));
-		addOption(options, new Option(ARG.nominalTime.getArgName(), true,
-				"instance time"));
-		addOption(options, new Option(ARG.operation.getArgName(), true,
-				"operation like generate, delete, archive"));
-		addOption(options, new Option(ARG.runId.getArgName(), true,
-				"current run-id of the instance"));
-		addOption(options, new Option(ARG.status.getArgName(), true,
-				"status of workflow instance"));
-		addOption(options, new Option(ARG.timeStamp.getArgName(), true,
-				"current timestamp"));
-		addOption(options, new Option(ARG.topicName.getArgName(), true,
-				"name of the topic to be used to send message"));
-		addOption(options, new Option(ARG.workflowId.getArgName(), true,
-				"workflow id"));
-		addOption(options, new Option(ARG.cluster.getArgName(), true,
-				"cluster name"));
-
-		return new GnuParser().parse(options, arguments);
-	}
-
-	private static void addOption(Options options, Option opt) {
-		opt.setRequired(true);
-		options.addOption(opt);
-	}
-
-	@Override
-	public int run(String[] args) throws Exception {
-		CommandLine cmd;
-		try {
-			cmd = getCommand(args);
-		} catch (ParseException e) {
-			throw new Exception("Unable to parse arguments: ", e);
-		}
-		EntityInstanceMessage[] entityInstanceMessage = EntityInstanceMessage
-				.getMessages(cmd);
-		if (entityInstanceMessage == null || entityInstanceMessage.length == 0) {
-			LOG.warn("No operation on output feed");
-			return 0;
-		}
-
-		MessageProducer falconMessageProducer = new MessageProducer();
-		try {
-			falconMessageProducer.createAndStartConnection(
-					cmd.getOptionValue(ARG.brokerImplClass.name()), "",
-					"", cmd.getOptionValue(ARG.brokerUrl.name()));
-			for (EntityInstanceMessage message : entityInstanceMessage) {
-				LOG.info("Sending message:" + message.getKeyValueMap());
-				falconMessageProducer.sendMessage(message);
-			}
-		} catch (JMSException e) {
-			LOG.error("Error in getConnection:", e);
-		} catch (Exception e) {
-			LOG.error("Error in getConnection:", e);
-		} finally {
-			try {
-			    if(falconMessageProducer.connection != null)
-			        falconMessageProducer.connection.close();
-			} catch (JMSException e) {
-				LOG.error("Error in closing connection:", e);
-			}
-		}
-		return 0;
-	}
+    private Connection connection;
+    private static final Logger LOG = Logger.getLogger(MessageProducer.class);
+    private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
+
+    /**
+     * @param arguments - Accepts a Message to be send to JMS topic, creates a new
+     *                  Topic based on topic name if it does not exist or else
+     *                  existing topic with the same name is used to send the message.
+     * @throws JMSException
+     */
+    protected void sendMessage(EntityInstanceMessage entityInstanceMessage)
+            throws JMSException {
+
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        Topic entityTopic = session.createTopic(entityInstanceMessage
+                .getTopicName());
+        javax.jms.MessageProducer producer = session
+                .createProducer(entityTopic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long messageTTL = DEFAULT_TTL;
+        try {
+            long messageTTLinMins = Long.parseLong(entityInstanceMessage
+                    .getBrokerTTL());
+            messageTTL = messageTTLinMins * 60 * 1000;
+        } catch (NumberFormatException e) {
+            LOG.error("Error in parsing broker.ttl, setting TTL to:"
+                    + DEFAULT_TTL + " milli-seconds");
+        }
+        producer.setTimeToLive(messageTTL);
+        producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
+                .createMessage(session));
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new MessageProducer(), args);
+    }
+
+    private void createAndStartConnection(String implementation,
+                                          String userName, String password, String url) throws JMSException,
+                                                                                               ClassNotFoundException,
+                                                                                               IllegalArgumentException,
+                                                                                               SecurityException,
+                                                                                               InstantiationException,
+                                                                                               IllegalAccessException,
+                                                                                               InvocationTargetException,
+                                                                                               NoSuchMethodException {
+
+        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) MessageProducer.class
+                .getClassLoader().loadClass(implementation);
+
+        ConnectionFactory connectionFactory = clazz.getConstructor(
+                String.class, String.class, String.class).newInstance(userName,
+                password, url);
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+    }
+
+    private static CommandLine getCommand(String[] arguments)
+            throws ParseException {
+        Options options = new Options();
+        addOption(options, new Option(ARG.brokerImplClass.getArgName(), true,
+                "message broker Implementation class"));
+        addOption(options, new Option(ARG.brokerTTL.getArgName(), true,
+                "message time-to-live"));
+        addOption(options, new Option(ARG.brokerUrl.getArgName(), true,
+                "message broker url"));
+        addOption(options, new Option(ARG.entityName.getArgName(), true,
+                "name of the entity"));
+        addOption(options, new Option(ARG.entityType.getArgName(), true,
+                "type of the entity"));
+        addOption(options, new Option(ARG.feedInstancePaths.getArgName(),
+                true, "feed instance paths"));
+        addOption(options, new Option(ARG.feedNames.getArgName(), true,
+                "feed names"));
+        addOption(options, new Option(ARG.logFile.getArgName(), true,
+                "log file path"));
+        addOption(options, new Option(ARG.nominalTime.getArgName(), true,
+                "instance time"));
+        addOption(options, new Option(ARG.operation.getArgName(), true,
+                "operation like generate, delete, archive"));
+        addOption(options, new Option(ARG.runId.getArgName(), true,
+                "current run-id of the instance"));
+        addOption(options, new Option(ARG.status.getArgName(), true,
+                "status of workflow instance"));
+        addOption(options, new Option(ARG.timeStamp.getArgName(), true,
+                "current timestamp"));
+        addOption(options, new Option(ARG.topicName.getArgName(), true,
+                "name of the topic to be used to send message"));
+        addOption(options, new Option(ARG.workflowId.getArgName(), true,
+                "workflow id"));
+        addOption(options, new Option(ARG.cluster.getArgName(), true,
+                "cluster name"));
+
+        return new GnuParser().parse(options, arguments);
+    }
+
+    private static void addOption(Options options, Option opt) {
+        opt.setRequired(true);
+        options.addOption(opt);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        CommandLine cmd;
+        try {
+            cmd = getCommand(args);
+        } catch (ParseException e) {
+            throw new Exception("Unable to parse arguments: ", e);
+        }
+        EntityInstanceMessage[] entityInstanceMessage = EntityInstanceMessage
+                .getMessages(cmd);
+        if (entityInstanceMessage == null || entityInstanceMessage.length == 0) {
+            LOG.warn("No operation on output feed");
+            return 0;
+        }
+
+        MessageProducer falconMessageProducer = new MessageProducer();
+        try {
+            falconMessageProducer.createAndStartConnection(
+                    cmd.getOptionValue(ARG.brokerImplClass.name()), "",
+                    "", cmd.getOptionValue(ARG.brokerUrl.name()));
+            for (EntityInstanceMessage message : entityInstanceMessage) {
+                LOG.info("Sending message:" + message.getKeyValueMap());
+                falconMessageProducer.sendMessage(message);
+            }
+        } catch (JMSException e) {
+            LOG.error("Error in getConnection:", e);
+        } catch (Exception e) {
+            LOG.error("Error in getConnection:", e);
+        } finally {
+            try {
+                if (falconMessageProducer.connection != null) {
+                    falconMessageProducer.connection.close();
+                }
+            } catch (JMSException e) {
+                LOG.error("Error in closing connection:", e);
+            }
+        }
+        return 0;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/messaging/src/main/resources/log4j.xml b/messaging/src/main/resources/log4j.xml
index a69599b..2788c63 100644
--- a/messaging/src/main/resources/log4j.xml
+++ b/messaging/src/main/resources/log4j.xml
@@ -20,44 +20,44 @@
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="console" class="org.apache.log4j.ConsoleAppender">
-    <param name="Target" value="System.out"/>
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
-    </layout>
-  </appender>
-
-  <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-    <param name="File" value="/var/log/falcon/application.log"/>
-    <param name="Append" value="true"/>
-    <param name="Threshold" value="debug"/>
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
-    </layout>
-  </appender>
-
-  <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
-    <param name="File" value="/var/log/falcon/audit.log"/>
-    <param name="Append" value="true"/>
-    <param name="Threshold" value="debug"/>
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d %-5p - %m%n"/>
-    </layout>
-  </appender>
-
-  <logger name="org.apache.falcon" additivity="false">
-    <level value="debug"/>
-    <appender-ref ref="console" />
-  </logger>
-
-  <logger name="AUDIT">
-    <level value="info"/>
-    <appender-ref ref="AUDIT" />
-  </logger>
-
-  <root>
-    <priority value ="info" />
-    <appender-ref ref="console" />
-  </root>
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="/var/log/falcon/application.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="/var/log/falcon/audit.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+        </layout>
+    </appender>
+
+    <logger name="org.apache.falcon" additivity="false">
+        <level value="debug"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <logger name="AUDIT">
+        <level value="info"/>
+        <appender-ref ref="AUDIT"/>
+    </logger>
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="console"/>
+    </root>
 
 </log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 90c4689..9a13738 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -17,14 +17,6 @@
  */
 package org.apache.falcon.messaging;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
@@ -33,137 +25,140 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.jms.*;
+
 public class FalconTopicProducerTest {
 
-	private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-	// private static final String BROKER_URL =
-	// "tcp://localhost:61616?daemon=true";
-	private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-	private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
-	private BrokerService broker;
-
-	private volatile AssertionError error;
-
-	@BeforeClass
-	public void setup() throws Exception {
-		broker = new BrokerService();
-		broker.addConnector(BROKER_URL);
-		broker.setDataDirectory("target/activemq");
-		broker.setBrokerName("localhost");
-		broker.start();
-	}
-
-	@AfterClass
-	public void tearDown() throws Exception {
-		broker.deleteAllMessages();
-		broker.stop();
-	}
-	
-	@Test
-	public void testWithFeedOutputPaths() throws Exception{
-		String [] args = new String[] { "-" + ARG.entityName.getArgName(), "agg-coord",
-				"-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
-				"-" + ARG.feedInstancePaths.getArgName(),
-				"/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
-				"-" + ARG.workflowId.getArgName(), "workflow-01-00",
-				"-" + ARG.runId.getArgName(), "1",
-				"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-				"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-				"-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-				"-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-				"-" + ARG.entityType.getArgName(), ("process"),
-				"-" + ARG.operation.getArgName(), ("GENERATE"),
-				"-" + ARG.logFile.getArgName(), ("/logFile"),
-				"-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-				"-" + ARG.status.getArgName(), ("SUCCEEDED"),
-				"-" + ARG.brokerTTL.getArgName(), "10",
-				"-" + ARG.cluster.getArgName(), "corp" };
-		testProcessMessageCreator(args);
-	}
-	
-	@Test
-	public void testWithEmptyFeedOutputPaths() throws Exception{
-		String [] args = new String[] { "-" + ARG.entityName.getArgName(), "agg-coord",
-				"-" + ARG.feedNames.getArgName(), "null",
-				"-" + ARG.feedInstancePaths.getArgName(),
-				"null",
-				"-" + ARG.workflowId.getArgName(), "workflow-01-00",
-				"-" + ARG.runId.getArgName(), "1",
-				"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-				"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-				"-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-				"-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-				"-" + ARG.entityType.getArgName(), ("process"),
-				"-" + ARG.operation.getArgName(), ("GENERATE"),
-				"-" + ARG.logFile.getArgName(), ("/logFile"),
-				"-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-				"-" + ARG.status.getArgName(), ("SUCCEEDED"),
-				"-" + ARG.brokerTTL.getArgName(), "10",
-				"-" + ARG.cluster.getArgName(), "corp" };
-		testProcessMessageCreator(args);
-	}
-
-	private void testProcessMessageCreator(String[] args) throws Exception {
-
-		Thread t = new Thread() {
-			@Override
-			public void run() {
-				try {
-					consumer();
-				} catch (AssertionError e) {
-					error = e;
-				} catch (JMSException ignore) {
-
-				}
-			}
-		};
-		t.start();
-		Thread.sleep(1500);
-		new MessageProducer().run(args);
-		t.join();
-		if (error != null) {
-			throw error;
-		}
-	}
-
-	private void consumer() throws JMSException {
-		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-				BROKER_URL);
-		Connection connection = connectionFactory.createConnection();
-		connection.start();
-
-		Session session = connection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		Destination destination = session.createTopic(TOPIC_NAME);
-		MessageConsumer consumer = session.createConsumer(destination);
-
-		// wait till you get atleast one message
-		MapMessage m;
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-
-		assertMessage(m);
-		Assert.assertTrue((m.getString(ARG.feedNames.getArgName())
-				.equals("click-logs,raw-logs"))
-				|| (m.getString(ARG.feedNames.getArgName()).equals("null")));
-		Assert.assertTrue(m.getString(ARG.feedInstancePaths.getArgName())
-				.equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20")
-				|| (m.getString(ARG.feedInstancePaths.getArgName()).equals("null")));
-
-		connection.close();
-	}
-
-	private void assertMessage(MapMessage m) throws JMSException {
-		Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
-				"agg-coord");
-		Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
-				"workflow-01-00");
-		Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-		Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
-				"2011-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
-				"2012-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-	}
+    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+    // private static final String BROKER_URL =
+    // "tcp://localhost:61616?daemon=true";
+    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+    private BrokerService broker;
+
+    private volatile AssertionError error;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        broker = new BrokerService();
+        broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.setBrokerName("localhost");
+        broker.start();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        broker.deleteAllMessages();
+        broker.stop();
+    }
+
+    @Test
+    public void testWithFeedOutputPaths() throws Exception {
+        String[] args = new String[]{"-" + ARG.entityName.getArgName(), "agg-coord",
+                                     "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
+                                     "-" + ARG.feedInstancePaths.getArgName(),
+                                     "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+                                     "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                                     "-" + ARG.runId.getArgName(), "1",
+                                     "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
+                                     "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
+                                     "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
+                                     "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
+                                     "-" + ARG.entityType.getArgName(), ("process"),
+                                     "-" + ARG.operation.getArgName(), ("GENERATE"),
+                                     "-" + ARG.logFile.getArgName(), ("/logFile"),
+                                     "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
+                                     "-" + ARG.status.getArgName(), ("SUCCEEDED"),
+                                     "-" + ARG.brokerTTL.getArgName(), "10",
+                                     "-" + ARG.cluster.getArgName(), "corp"};
+        testProcessMessageCreator(args);
+    }
+
+    @Test
+    public void testWithEmptyFeedOutputPaths() throws Exception {
+        String[] args = new String[]{"-" + ARG.entityName.getArgName(), "agg-coord",
+                                     "-" + ARG.feedNames.getArgName(), "null",
+                                     "-" + ARG.feedInstancePaths.getArgName(),
+                                     "null",
+                                     "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                                     "-" + ARG.runId.getArgName(), "1",
+                                     "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
+                                     "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
+                                     "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
+                                     "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
+                                     "-" + ARG.entityType.getArgName(), ("process"),
+                                     "-" + ARG.operation.getArgName(), ("GENERATE"),
+                                     "-" + ARG.logFile.getArgName(), ("/logFile"),
+                                     "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
+                                     "-" + ARG.status.getArgName(), ("SUCCEEDED"),
+                                     "-" + ARG.brokerTTL.getArgName(), "10",
+                                     "-" + ARG.cluster.getArgName(), "corp"};
+        testProcessMessageCreator(args);
+    }
+
+    private void testProcessMessageCreator(String[] args) throws Exception {
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    consumer();
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+
+                }
+            }
+        };
+        t.start();
+        Thread.sleep(1500);
+        new MessageProducer().run(args);
+        t.join();
+        if (error != null) {
+            throw error;
+        }
+    }
+
+    private void consumer() throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(TOPIC_NAME);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // wait till you get atleast one message
+        MapMessage m;
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+
+        assertMessage(m);
+        Assert.assertTrue((m.getString(ARG.feedNames.getArgName())
+                .equals("click-logs,raw-logs"))
+                || (m.getString(ARG.feedNames.getArgName()).equals("null")));
+        Assert.assertTrue(m.getString(ARG.feedInstancePaths.getArgName())
+                .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20")
+                || (m.getString(ARG.feedInstancePaths.getArgName()).equals("null")));
+
+        connection.close();
+    }
+
+    private void assertMessage(MapMessage m) throws JMSException {
+        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
+                "agg-coord");
+        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+                "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
+        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+                "2011-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+                "2012-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 5607493..45252dd 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -21,12 +21,12 @@ package org.apache.falcon.messaging;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -38,162 +38,166 @@ import java.io.OutputStream;
 
 public class FeedProducerTest {
 
-	private String[] args;
-	private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-	// private static final String BROKER_URL =
-	// "tcp://localhost:61616?daemon=true";
-	private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-	private static final String TOPIC_NAME = "Falcon.process1.click-logs";
-	private BrokerService broker;
+    private String[] args;
+    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+    // private static final String BROKER_URL =
+    // "tcp://localhost:61616?daemon=true";
+    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TOPIC_NAME = "Falcon.process1.click-logs";
+    private BrokerService broker;
 
-	private Path logFile;
+    private Path logFile;
 
-	private volatile AssertionError error;
-	private EmbeddedCluster dfsCluster;
-	private Configuration conf ;
+    private volatile AssertionError error;
+    private EmbeddedCluster dfsCluster;
+    private Configuration conf;
 
-	@BeforeClass
-	public void setup() throws Exception {
+    @BeforeClass
+    public void setup() throws Exception {
 
-		this.dfsCluster = EmbeddedCluster.newCluster("testCluster", false);
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster", false);
         conf = dfsCluster.getConf();
-		logFile = new Path(conf.get("fs.default.name"),
-				"/falcon/feed/agg-logs/instance-2012-01-01-10-00.csv");
-
-		args = new String[] { "-" + ARG.entityName.getArgName(), TOPIC_NAME,
-				"-" + ARG.feedNames.getArgName(), "click-logs",
-				"-" + ARG.feedInstancePaths.getArgName(),
-				"/click-logs/10/05/05/00/20",
-				"-" + ARG.workflowId.getArgName(), "workflow-01-00",
-				"-" + ARG.runId.getArgName(), "1",
-				"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-				"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-				"-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-				"-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-				"-" + ARG.entityType.getArgName(), ("FEED"),
-				"-" + ARG.operation.getArgName(), ("DELETE"),
-				"-" + ARG.logFile.getArgName(), (logFile.toString()),
-				"-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-				"-" + ARG.status.getArgName(), ("SUCCEEDED"),
-				"-" + ARG.brokerTTL.getArgName(), "10",
-				"-" + ARG.cluster.getArgName(), "corp" };
-
-		broker = new BrokerService();
-		broker.addConnector(BROKER_URL);
-		broker.setDataDirectory("target/activemq");
-		broker.start();
-	}
-
-	@AfterClass
-	public void tearDown() throws Exception {
-		broker.deleteAllMessages();
-		broker.stop();
-		this.dfsCluster.shutdown();
-	}
-
-	@Test
-	public void testLogFile() throws Exception {
-		FileSystem fs = dfsCluster.getFileSystem();
-		OutputStream out = fs.create(logFile);
-		InputStream in = new ByteArrayInputStream(
-				("instancePaths=/falcon/feed/agg-logs/path1/2010/10/10/20,"
-						+ "/falcon/feed/agg-logs/path1/2010/10/10/21,"
-						+ "/falcon/feed/agg-logs/path1/2010/10/10/22,"
-						+ "/falcon/feed/agg-logs/path1/2010/10/10/23")
-						.getBytes());
-		IOUtils.copyBytes(in, out, conf);
-		testProcessMessageCreator();
-	}
-
-	@Test
-	public void testEmptyLogFile() throws Exception {
-		FileSystem fs = dfsCluster.getFileSystem();
-		OutputStream out = fs.create(logFile);
-		InputStream in = new ByteArrayInputStream(("instancePaths=").getBytes());
-		IOUtils.copyBytes(in, out, conf);
-
-		new MessageProducer().run(this.args);
-	}
-
-	private void testProcessMessageCreator() throws Exception {
-
-		Thread t = new Thread() {
-			@Override
-			public void run() {
-				try {
-					consumer();
-				} catch (AssertionError e) {
-					error = e;
-				} catch (JMSException ignore) {
-
-				}
-			}
-		};
-		t.start();
-		Thread.sleep(1500);
-		new MessageProducer().run(this.args);
-		t.join();
-		if (error != null) {
-			throw error;
-		}
-	}
-
-	private void consumer() throws JMSException {
-		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-				BROKER_URL);
-		Connection connection = connectionFactory.createConnection();
-		connection.start();
-
-		Session session = connection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		Destination destination = session.createTopic(TOPIC_NAME);
-		MessageConsumer consumer = session.createConsumer(destination);
-
-		// wait till you get atleast one message
-		MapMessage m;
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/falcon/feed/agg-logs/path1/2010/10/10/20");
-
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/falcon/feed/agg-logs/path1/2010/10/10/21");
-
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/falcon/feed/agg-logs/path1/2010/10/10/22");
-
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/falcon/feed/agg-logs/path1/2010/10/10/23");
-
-		connection.close();
-	}
-
-	private void assertMessage(MapMessage m) throws JMSException {
-		Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
-				TOPIC_NAME);
-		Assert.assertEquals(m.getString(ARG.operation.getArgName()), "DELETE");
-		Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
-				"workflow-01-00");
-		Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-		Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
-				"2011-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
-				"2012-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-	}
+        logFile = new Path(conf.get("fs.default.name"),
+                "/falcon/feed/agg-logs/instance-2012-01-01-10-00.csv");
+
+        args = new String[]{"-" + ARG.entityName.getArgName(), TOPIC_NAME,
+                            "-" + ARG.feedNames.getArgName(), "click-logs",
+                            "-" + ARG.feedInstancePaths.getArgName(),
+                            "/click-logs/10/05/05/00/20",
+                            "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                            "-" + ARG.runId.getArgName(), "1",
+                            "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
+                            "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
+                            "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
+                            "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
+                            "-" + ARG.entityType.getArgName(), ("FEED"),
+                            "-" + ARG.operation.getArgName(), ("DELETE"),
+                            "-" + ARG.logFile.getArgName(), (logFile.toString()),
+                            "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
+                            "-" + ARG.status.getArgName(), ("SUCCEEDED"),
+                            "-" + ARG.brokerTTL.getArgName(), "10",
+                            "-" + ARG.cluster.getArgName(), "corp"};
+
+        broker = new BrokerService();
+        broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.start();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        broker.deleteAllMessages();
+        broker.stop();
+        this.dfsCluster.shutdown();
+    }
+
+    @Test
+    public void testLogFile() throws Exception {
+        FileSystem fs = dfsCluster.getFileSystem();
+        OutputStream out = fs.create(logFile);
+        InputStream in = new ByteArrayInputStream(
+                ("instancePaths=/falcon/feed/agg-logs/path1/2010/10/10/20,"
+                        + "/falcon/feed/agg-logs/path1/2010/10/10/21,"
+                        + "/falcon/feed/agg-logs/path1/2010/10/10/22,"
+                        + "/falcon/feed/agg-logs/path1/2010/10/10/23")
+                        .getBytes());
+        IOUtils.copyBytes(in, out, conf);
+        testProcessMessageCreator();
+    }
+
+    @Test
+    public void testEmptyLogFile() throws Exception {
+        FileSystem fs = dfsCluster.getFileSystem();
+        OutputStream out = fs.create(logFile);
+        InputStream in = new ByteArrayInputStream(("instancePaths=").getBytes());
+        IOUtils.copyBytes(in, out, conf);
+
+        new MessageProducer().run(this.args);
+    }
+
+    private void testProcessMessageCreator() throws Exception {
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    consumer();
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+
+                }
+            }
+        };
+        t.start();
+        Thread.sleep(1500);
+        new MessageProducer().run(this.args);
+        t.join();
+        if (error != null) {
+            throw error;
+        }
+    }
+
+    private void consumer() throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(TOPIC_NAME);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // wait till you get atleast one message
+        MapMessage m;
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/falcon/feed/agg-logs/path1/2010/10/10/20");
+
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/falcon/feed/agg-logs/path1/2010/10/10/21");
+
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/falcon/feed/agg-logs/path1/2010/10/10/22");
+
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/falcon/feed/agg-logs/path1/2010/10/10/23");
+
+        connection.close();
+    }
+
+    private void assertMessage(MapMessage m) throws JMSException {
+        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
+                TOPIC_NAME);
+        Assert.assertEquals(m.getString(ARG.operation.getArgName()), "DELETE");
+        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+                "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
+        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+                "2011-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+                "2012-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index d2554ce..55a5fc0 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.falcon.messaging;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
@@ -34,117 +26,121 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.jms.*;
+
 public class ProcessProducerTest {
 
-	private String[] args;
-	private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-	// private static final String BROKER_URL =
-	// "tcp://localhost:61616?daemon=true";
-	private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-	private static final String TOPIC_NAME = "FALCON.PROCESS";
-	private BrokerService broker;
-
-	private volatile AssertionError error;
-
-	@BeforeClass
-	public void setup() throws Exception {
-		args = new String[] { "-" + ARG.entityName.getArgName(), TOPIC_NAME,
-				"-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
-				"-" + ARG.feedInstancePaths.getArgName(),
-				"/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
-				"-" + ARG.workflowId.getArgName(), "workflow-01-00",
-				"-" + ARG.runId.getArgName(), "1",
-				"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-				"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-				"-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-				"-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-				"-" + ARG.entityType.getArgName(), ("process"),
-				"-" + ARG.operation.getArgName(), ("GENERATE"),
-				"-" + ARG.logFile.getArgName(), ("/logFile"),
-				"-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-				"-" + ARG.status.getArgName(), ("SUCCEEDED"),
-				"-" + ARG.brokerTTL.getArgName(), "10",
-				"-" + ARG.cluster.getArgName(), "corp" };
-		broker = new BrokerService();
-		broker.addConnector(BROKER_URL);
-		broker.setDataDirectory("target/activemq");
-		broker.setBrokerName("localhost");
-		broker.setSchedulerSupport(true);
-		broker.start();
-	}
-
-	@AfterClass
-	public void tearDown() throws Exception {
-		broker.deleteAllMessages();
-		broker.stop();
-	}
-
-	@Test
-	public void testProcessMessageCreator() throws Exception {
-
-		Thread t = new Thread() {
-			@Override
-			public void run() {
-				try {
-					consumer();
-				} catch (AssertionError e) {
-					error = e;
-				} catch (JMSException ignore) {
-
-				}
-			}
-		};
-		t.start();
-		Thread.sleep(1500);
-		new MessageProducer().run(this.args);
-		t.join();
-		if (error != null) {
-			throw error;
-		}
-	}
-
-	private void consumer() throws JMSException {
-		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-				BROKER_URL);
-		Connection connection = connectionFactory.createConnection();
-		connection.start();
-
-		Session session = connection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		Destination destination = session.createTopic(TOPIC_NAME);
-		MessageConsumer consumer = session.createConsumer(destination);
-
-		// wait till you get atleast one message
-		MapMessage m;
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedNames.getArgName()),
-				"click-logs");
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/click-logs/10/05/05/00/20");
-
-		for (m = null; m == null;)
-			m = (MapMessage) consumer.receive();
-		System.out.println("Consumed: " + m.toString());
-		assertMessage(m);
-		Assert.assertEquals(m.getString(ARG.feedNames.getArgName()), "raw-logs");
-		Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
-				"/raw-logs/10/05/05/00/20");
-		connection.close();
-	}
-
-	private void assertMessage(MapMessage m) throws JMSException {
-		Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
-				TOPIC_NAME);
-		Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
-				"workflow-01-00");
-		Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-		Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
-				"2011-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
-				"2012-01-01T01:00Z");
-		Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-	}
+    private String[] args;
+    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+    // private static final String BROKER_URL =
+    // "tcp://localhost:61616?daemon=true";
+    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TOPIC_NAME = "FALCON.PROCESS";
+    private BrokerService broker;
+
+    private volatile AssertionError error;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        args = new String[]{"-" + ARG.entityName.getArgName(), TOPIC_NAME,
+                            "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
+                            "-" + ARG.feedInstancePaths.getArgName(),
+                            "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+                            "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                            "-" + ARG.runId.getArgName(), "1",
+                            "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
+                            "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
+                            "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
+                            "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
+                            "-" + ARG.entityType.getArgName(), ("process"),
+                            "-" + ARG.operation.getArgName(), ("GENERATE"),
+                            "-" + ARG.logFile.getArgName(), ("/logFile"),
+                            "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
+                            "-" + ARG.status.getArgName(), ("SUCCEEDED"),
+                            "-" + ARG.brokerTTL.getArgName(), "10",
+                            "-" + ARG.cluster.getArgName(), "corp"};
+        broker = new BrokerService();
+        broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.setBrokerName("localhost");
+        broker.setSchedulerSupport(true);
+        broker.start();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        broker.deleteAllMessages();
+        broker.stop();
+    }
+
+    @Test
+    public void testProcessMessageCreator() throws Exception {
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    consumer();
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+
+                }
+            }
+        };
+        t.start();
+        Thread.sleep(1500);
+        new MessageProducer().run(this.args);
+        t.join();
+        if (error != null) {
+            throw error;
+        }
+    }
+
+    private void consumer() throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(TOPIC_NAME);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // wait till you get atleast one message
+        MapMessage m;
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedNames.getArgName()),
+                "click-logs");
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/click-logs/10/05/05/00/20");
+
+        for (m = null; m == null; ) {
+            m = (MapMessage) consumer.receive();
+        }
+        System.out.println("Consumed: " + m.toString());
+        assertMessage(m);
+        Assert.assertEquals(m.getString(ARG.feedNames.getArgName()), "raw-logs");
+        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+                "/raw-logs/10/05/05/00/20");
+        connection.close();
+    }
+
+    private void assertMessage(MapMessage m) throws JMSException {
+        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
+                TOPIC_NAME);
+        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+                "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
+        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+                "2011-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+                "2012-01-01T01:00Z");
+        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+    }
 }


Mime
View raw message