falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [08/47] Fixes for Checkstyle
Date Fri, 26 Apr 2013 15:50:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 19269c1..188c397 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -30,65 +29,65 @@ import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 
 public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
-		AbstractRerunHandler<RetryEvent, M> {
+        AbstractRerunHandler<RetryEvent, M> {
 
-	@Override
-	public void handleRerun(String cluster, String entityType,
-			String entityName, String nominalTime, String runId, String wfId,
-			long msgReceivedTime) {
-		try {
-			Entity entity = getEntity(entityType, entityName);
-			Retry retry = getRetry(entity);
+    @Override
+    public void handleRerun(String cluster, String entityType,
+                            String entityName, String nominalTime, String runId, String wfId,
+                            long msgReceivedTime) {
+        try {
+            Entity entity = getEntity(entityType, entityName);
+            Retry retry = getRetry(entity);
 
-			if (retry == null) {
-				LOG.warn("Retry not configured for entity:" + entityType + "("
-						+ entity.getName() + "), ignoring failed retries");
-				return;
-			}
+            if (retry == null) {
+                LOG.warn("Retry not configured for entity:" + entityType + "("
+                        + entity.getName() + "), ignoring failed retries");
+                return;
+            }
 
-			int attempts = retry.getAttempts();
-			Frequency delay = retry.getDelay();
-			PolicyType policy = retry.getPolicy();
-			int intRunId = Integer.parseInt(runId);
+            int attempts = retry.getAttempts();
+            Frequency delay = retry.getDelay();
+            PolicyType policy = retry.getPolicy();
+            int intRunId = Integer.parseInt(runId);
 
-			if (attempts > intRunId) {
-				AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
-						.getRetryPolicy(policy);
-				long delayTime = rerunPolicy.getDelay(delay,
-						Integer.parseInt(runId));
-				RetryEvent event = new RetryEvent(cluster, wfId,
-						msgReceivedTime, delayTime, entityType, entityName,
-						nominalTime, intRunId, attempts, 0);
-				offerToQueue(event);
-			} else {
-				LOG.warn("All retry attempt failed out of configured: "
-						+ attempts + " attempt for entity instance::"
-						+ entityName + ":" + nominalTime + " And WorkflowId: "
-						+ wfId);
+            if (attempts > intRunId) {
+                AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
+                        .getRetryPolicy(policy);
+                long delayTime = rerunPolicy.getDelay(delay,
+                        Integer.parseInt(runId));
+                RetryEvent event = new RetryEvent(cluster, wfId,
+                        msgReceivedTime, delayTime, entityType, entityName,
+                        nominalTime, intRunId, attempts, 0);
+                offerToQueue(event);
+            } else {
+                LOG.warn("All retry attempt failed out of configured: "
+                        + attempts + " attempt for entity instance::"
+                        + entityName + ":" + nominalTime + " And WorkflowId: "
+                        + wfId);
 
-				GenericAlert.alertRetryFailed(entityType, entityName,
-						nominalTime, wfId, runId,
-						"All retry attempt failed out of configured: "
-								+ attempts + " attempt for entity instance::");
-			}
-		} catch (Exception e) {
-			LOG.error("Error during retry of entity instance " + entityName
-					+ ":" + nominalTime, e);
-			GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
-					wfId, runId, e.getMessage());
-		}
+                GenericAlert.alertRetryFailed(entityType, entityName,
+                        nominalTime, wfId, runId,
+                        "All retry attempt failed out of configured: "
+                                + attempts + " attempt for entity instance::");
+            }
+        } catch (Exception e) {
+            LOG.error("Error during retry of entity instance " + entityName
+                    + ":" + nominalTime, e);
+            GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
+                    wfId, runId, e.getMessage());
+        }
 
-	}
+    }
 
-	@Override
-	public void init(M queue) throws FalconException {
-		super.init(queue);
-		Thread daemon = new Thread(new RetryConsumer(this));
-		daemon.setName("RetryHandler");
-		daemon.setDaemon(true);
-		daemon.start();
-		LOG.info("RetryHandler  thread started");
+    @Override
+    public void init(M queue) throws FalconException {
+        super.init(queue);
+        Thread daemon = new Thread(new RetryConsumer(this));
+        daemon.setName("RetryHandler");
+        daemon.setDaemon(true);
+        daemon.start();
+        LOG.info("RetryHandler  thread started");
 
-	}
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
index ca6c354..4bfbef2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
@@ -17,28 +17,28 @@
  */
 package org.apache.falcon.rerun.policy;
 
-import java.util.Date;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.util.Date;
+
 public abstract class AbstractRerunPolicy {
 
-	public long getDurationInMilliSec(Frequency frequency)
-			throws FalconException {
-		ExpressionHelper helper = ExpressionHelper.get();
-		return helper.evaluate(frequency.toString(), Long.class);
+    public long getDurationInMilliSec(Frequency frequency)
+            throws FalconException {
+        ExpressionHelper helper = ExpressionHelper.get();
+        return helper.evaluate(frequency.toString(), Long.class);
 
-	}
+    }
 
-	public static Date addTime(Date date, int milliSecondsToAdd) {
-		return new Date(date.getTime() + milliSecondsToAdd);
-	}
+    public static Date addTime(Date date, int milliSecondsToAdd) {
+        return new Date(date.getTime() + milliSecondsToAdd);
+    }
 
-	public abstract long getDelay(Frequency delay, int eventNumber)
-			throws FalconException;
+    public abstract long getDelay(Frequency delay, int eventNumber)
+            throws FalconException;
 
-	public abstract long getDelay(Frequency delay, Date nominaltime,
-			Date cutOffTime) throws FalconException;
+    public abstract long getDelay(Frequency delay, Date nominaltime,
+                                  Date cutOffTime) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
index 31ff052..b23e014 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
@@ -17,43 +17,44 @@
  */
 package org.apache.falcon.rerun.policy;
 
-import java.util.Date;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.util.Date;
+
 public class ExpBackoffPolicy extends AbstractRerunPolicy {
 
-	@Override
-	public long getDelay(Frequency delay, int eventNumber)
-			throws FalconException {
-		return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(),
-				eventNumber));
-	}
-
-	@Override
-	public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime)
-			throws FalconException {
-		ExpressionHelper evaluator = ExpressionHelper.get();
-		Date now = new Date();
-		Date lateTime = nominalTime;
-		long delayMilliSeconds = evaluator.evaluate(delay.toString(),
-				Long.class);
-		int factor = 1;
-		// TODO we can get rid of this using formula
-		while (lateTime.compareTo(now)<=0) {
-			lateTime = addTime(lateTime, (int) (factor * delayMilliSeconds));
-			factor *= getPower();
-		}
-		if (lateTime.after(cutOffTime))
-			lateTime = cutOffTime;
-		return (lateTime.getTime() - nominalTime.getTime());
-
-	}
-
-	protected int getPower() {
-		return 2;
-	}
+    @Override
+    public long getDelay(Frequency delay, int eventNumber)
+            throws FalconException {
+        return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(),
+                eventNumber));
+    }
+
+    @Override
+    public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime)
+            throws FalconException {
+        ExpressionHelper evaluator = ExpressionHelper.get();
+        Date now = new Date();
+        Date lateTime = nominalTime;
+        long delayMilliSeconds = evaluator.evaluate(delay.toString(),
+                Long.class);
+        int factor = 1;
+        // TODO we can get rid of this using formula
+        while (lateTime.compareTo(now) <= 0) {
+            lateTime = addTime(lateTime, (int) (factor * delayMilliSeconds));
+            factor *= getPower();
+        }
+        if (lateTime.after(cutOffTime)) {
+            lateTime = cutOffTime;
+        }
+        return (lateTime.getTime() - nominalTime.getTime());
+
+    }
+
+    protected int getPower() {
+        return 2;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
index c8c400b..19fe8f7 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
@@ -18,11 +18,11 @@
 
 package org.apache.falcon.rerun.policy;
 
-import java.util.Date;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Frequency;
 
+import java.util.Date;
+
 public class FinalPolicy extends AbstractRerunPolicy {
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
index 83107ec..9bcca26 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
@@ -18,8 +18,8 @@
 package org.apache.falcon.rerun.policy;
 
 public class PeriodicPolicy extends ExpBackoffPolicy {
-	@Override
-	protected int getPower() {
-		return 1;
-	}
+    @Override
+    protected int getPower() {
+        return 1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
index f7ac42f..5ee902a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
@@ -28,7 +28,7 @@ public class RerunPolicyFactory {
     public static AbstractRerunPolicy getRetryPolicy(PolicyType latePolicy) {
         switch (latePolicy) {
             case PERIODIC:
-                return new  PeriodicPolicy();
+                return new PeriodicPolicy();
 
             case EXP_BACKOFF:
                 return new ExpBackoffPolicy();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index 73de658..82dfc22 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -17,17 +17,6 @@
  */
 package org.apache.falcon.rerun.queue;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
@@ -35,120 +24,124 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.event.RerunEventFactory;
 
-public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
-
-	private ActiveMQConnection connection;
-	private String brokerUrl;
-	private String destinationName;
-	private Destination destination;
-	private MessageProducer producer;
-	private MessageConsumer consumer;
-
-	public ActiveMQueue(String brokerUrl, String destinationName) {
-		this.brokerUrl = brokerUrl;
-		this.destinationName = destinationName;
-	}
-
-	@Override
-	public boolean offer(T event) throws FalconException {
-		Session session;
-		try {
-			session = getSession();
-			TextMessage msg = session.createTextMessage(event.toString());
-			msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
-					event.getDelay(TimeUnit.MILLISECONDS));
-			msg.setStringProperty("TYPE", event.getType().name());
-			producer.send(msg);
-			LOG.debug("Enqueued Message:" + event.toString() + "with delay "
-					+ event.getDelay(TimeUnit.MILLISECONDS) + " milli sec");
-			return true;
-		} catch (Exception e) {
-			LOG.error("Unable to offer event:" + event + " to activeMqueue", e);
-			throw new FalconException("Unable to offer event:" + event + " to activeMqueue", e);
-		}
-	}
-
-	private Session getSession() throws Exception {
-		if (connection == null) {
-			init();
-		}
-		Session session = connection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		return session;
-	}
-
-	@Override
-	public T take() throws FalconException {
-		try {
-			TextMessage textMessage = (TextMessage) consumer.receive();
-			T event = new RerunEventFactory<T>().getRerunEvent(
-					textMessage.getStringProperty("TYPE"),
-					textMessage.getText());
-			LOG.debug("Dequeued Message:" + event.toString());
-			return event;
-		} catch (Exception e) {
-			LOG.error("Error getting the messge from ActiveMqueue: ", e);
-			throw new FalconException("Error getting the messge from ActiveMqueue: ", e);
-		}
-	}
-
-	@Override
-	public void populateQueue(List<T> events) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public void init() {
-		try {
-			createAndStartConnection("", "", brokerUrl);
-			Session session = connection.createSession(false,
-					Session.AUTO_ACKNOWLEDGE);
-			destination = session.createQueue(destinationName);
-			producer = session.createProducer(destination);
-			consumer = session.createConsumer(destination);
-			LOG.info("Initialized Queue on activeMQ: " + destinationName);
-		} catch (Exception e) {
-			LOG.error(
-					"Error starting ActiveMQueue connection for dealyed queue",
-					e);
-			throw new RuntimeException(
-					"Error starting ActiveMQueue connection for delayed queue",
-					e);
-		}
-	}
-
-	private void createAndStartConnection(String userName, String password,
-			String url) throws JMSException {
-		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-				userName, password, url);
-		connection = (ActiveMQConnection) connectionFactory.createConnection();
-		connection.start();
-		LOG.info("Connected successfully to " + url);
-	}
-
-	@Override
-	public void reconnect() throws FalconException {
-		try {
-			LOG.info("Attempting to close producer");
-			producer.close();
-			LOG.info("Producer closed successfully");
-		} catch (Exception ignore) {
-		}
-		try {
-			LOG.info("Attempting to close consumer");
-			consumer.close();
-			LOG.info("Consumer closed successfully");
-		} catch (Exception ignore) {
-		}
-		try {
-			LOG.info("Attempting to close connection");
-			connection.close();
-			LOG.info("Connection closed successfully");
-		} catch (Exception ignore) {
-		}
+import javax.jms.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-		init();
+public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
 
-	}
+    private ActiveMQConnection connection;
+    private String brokerUrl;
+    private String destinationName;
+    private Destination destination;
+    private MessageProducer producer;
+    private MessageConsumer consumer;
+
+    public ActiveMQueue(String brokerUrl, String destinationName) {
+        this.brokerUrl = brokerUrl;
+        this.destinationName = destinationName;
+    }
+
+    @Override
+    public boolean offer(T event) throws FalconException {
+        Session session;
+        try {
+            session = getSession();
+            TextMessage msg = session.createTextMessage(event.toString());
+            msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
+                    event.getDelay(TimeUnit.MILLISECONDS));
+            msg.setStringProperty("TYPE", event.getType().name());
+            producer.send(msg);
+            LOG.debug("Enqueued Message:" + event.toString() + "with delay "
+                    + event.getDelay(TimeUnit.MILLISECONDS) + " milli sec");
+            return true;
+        } catch (Exception e) {
+            LOG.error("Unable to offer event:" + event + " to activeMqueue", e);
+            throw new FalconException("Unable to offer event:" + event + " to activeMqueue", e);
+        }
+    }
+
+    private Session getSession() throws Exception {
+        if (connection == null) {
+            init();
+        }
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        return session;
+    }
+
+    @Override
+    public T take() throws FalconException {
+        try {
+            TextMessage textMessage = (TextMessage) consumer.receive();
+            T event = new RerunEventFactory<T>().getRerunEvent(
+                    textMessage.getStringProperty("TYPE"),
+                    textMessage.getText());
+            LOG.debug("Dequeued Message:" + event.toString());
+            return event;
+        } catch (Exception e) {
+            LOG.error("Error getting the messge from ActiveMqueue: ", e);
+            throw new FalconException("Error getting the messge from ActiveMqueue: ", e);
+        }
+    }
+
+    @Override
+    public void populateQueue(List<T> events) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void init() {
+        try {
+            createAndStartConnection("", "", brokerUrl);
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            destination = session.createQueue(destinationName);
+            producer = session.createProducer(destination);
+            consumer = session.createConsumer(destination);
+            LOG.info("Initialized Queue on activeMQ: " + destinationName);
+        } catch (Exception e) {
+            LOG.error(
+                    "Error starting ActiveMQueue connection for dealyed queue",
+                    e);
+            throw new RuntimeException(
+                    "Error starting ActiveMQueue connection for delayed queue",
+                    e);
+        }
+    }
+
+    private void createAndStartConnection(String userName, String password,
+                                          String url) throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                userName, password, url);
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        LOG.info("Connected successfully to " + url);
+    }
+
+    @Override
+    public void reconnect() throws FalconException {
+        try {
+            LOG.info("Attempting to close producer");
+            producer.close();
+            LOG.info("Producer closed successfully");
+        } catch (Exception ignore) {
+        }
+        try {
+            LOG.info("Attempting to close consumer");
+            consumer.close();
+            LOG.info("Consumer closed successfully");
+        } catch (Exception ignore) {
+        }
+        try {
+            LOG.info("Attempting to close connection");
+            connection.close();
+            LOG.info("Connection closed successfully");
+        } catch (Exception ignore) {
+        }
+
+        init();
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
index 0c63f8e..7e55206 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
@@ -17,22 +17,23 @@
  */
 package org.apache.falcon.rerun.queue;
 
-import java.util.List;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.log4j.Logger;
 
-public abstract class  DelayedQueue<T extends RerunEvent> {
-	public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
-	public abstract boolean offer(T event) throws FalconException;
-
-	public abstract T take() throws FalconException;
-	
-	public abstract void populateQueue(List<T> events);
-	
-	public abstract void init();
-	
-	public abstract void reconnect() throws FalconException;
+import java.util.List;
+
+public abstract class DelayedQueue<T extends RerunEvent> {
+    public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
+
+    public abstract boolean offer(T event) throws FalconException;
+
+    public abstract T take() throws FalconException;
+
+    public abstract void populateQueue(List<T> events);
+
+    public abstract void init();
+
+    public abstract void reconnect() throws FalconException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index ff92f3d..7243f4d 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -17,130 +17,125 @@
  */
 package org.apache.falcon.rerun.queue;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.DelayQueue;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.event.RerunEventFactory;
 import org.apache.log4j.Logger;
 
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.DelayQueue;
+
 public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
-	public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
-	protected DelayQueue<T> QUEUE = new DelayQueue<T>();
-	private File serializeFilePath;
-
-	@Override
-	public boolean offer(T event) {
-		boolean flag = QUEUE.offer(event);
-		beforeRetry(event);
-		LOG.debug("Enqueued Message:" + event.toString());
-		return flag;
-	}
-
-	@Override
-	public T take() throws FalconException {
-		T event;
-		try {
-			event = QUEUE.take();
-			LOG.debug("Dequeued Message:" + event.toString());
-			afterRetry(event);
-		} catch (InterruptedException e) {
-			throw new FalconException(e);
-		}
-		return event;
-	}
-
-	public InMemoryQueue(File serializeFilePath) {
-		this.serializeFilePath = serializeFilePath;
-	}
-
-	public void populateQueue(List<T> events) {
-		for (T event : events) {
-			QUEUE.offer(event);
-		}
-
-	}
-
-	@Override
-	public void init() {
-		List<T> events = bootstrap();
-		populateQueue(events);
-
-	}
-
-	@Override
-	public void reconnect() throws FalconException {
-		//Do Nothing
-	}
-	
-	private void beforeRetry(T event) {
-		File retryFile = getRetryFile(serializeFilePath, event);
-		try {
-			BufferedWriter out = new BufferedWriter(new FileWriter(retryFile,
-					true));
-			out.write(event.toString());
-			out.newLine();
-			out.close();
-		} catch (IOException e) {
-			LOG.warn(
-					"Unable to write entry for process-instance: "
-							+ event.getEntityName() + ":"
-							+ event.getInstance(), e);
-		}
-	}
-
-	private File getRetryFile(File basePath, T event) {
-		return new File(basePath, (event.getType().name()) + "-"
-				+ event.getEntityName() + "-"
-				+ event.getInstance().replaceAll(":", "-"));
-	}
-
-	private void afterRetry(T event) {
-		File retryFile = getRetryFile(serializeFilePath, event);
-		if (!retryFile.exists()) {
-			LOG.warn("Rerun file deleted or renamed for process-instance: "
-					+ event.getEntityName() + ":" + event.getInstance());
-			GenericAlert.alertRetryFailed(event.getEntityType(),
-					event.getEntityName(), event.getInstance(),
-					event.getWfId(), Integer.toString(event.getRunId()),
-					"Rerun file deleted or renamed for process-instance:");
-		} else {
-			if (!retryFile.delete()) {
-				LOG.warn("Unable to remove rerun file " + event.getWfId());
-				retryFile.deleteOnExit();
-			}
-		}
-	}
-
-	private List<T> bootstrap() {
-		List<T> rerunEvents = new ArrayList<T>();
-		for (File rerunFile : this.serializeFilePath.listFiles()) {
-			try {
-				BufferedReader reader = new BufferedReader(new FileReader(
-						rerunFile));
-				String line;
-				while ((line = reader.readLine()) != null) {
-					line.split("");
-					T event = new RerunEventFactory<T>().getRerunEvent(
-							rerunFile.getName(), line);
-					rerunEvents.add(event);
-				}
-			} catch (Exception e) {
-				LOG.warn(
-						"Not able to read rerun entry "
-								+ rerunFile.getAbsolutePath(), e);
-			}
-		}
-		return rerunEvents;
-
-	}
+    public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
+    protected DelayQueue<T> QUEUE = new DelayQueue<T>();
+    private File serializeFilePath;
+
+    @Override
+    public boolean offer(T event) {
+        boolean flag = QUEUE.offer(event);
+        beforeRetry(event);
+        LOG.debug("Enqueued Message:" + event.toString());
+        return flag;
+    }
+
+    @Override
+    public T take() throws FalconException {
+        T event;
+        try {
+            event = QUEUE.take();
+            LOG.debug("Dequeued Message:" + event.toString());
+            afterRetry(event);
+        } catch (InterruptedException e) {
+            throw new FalconException(e);
+        }
+        return event;
+    }
+
+    public InMemoryQueue(File serializeFilePath) {
+        this.serializeFilePath = serializeFilePath;
+    }
+
+    public void populateQueue(List<T> events) {
+        for (T event : events) {
+            QUEUE.offer(event);
+        }
+
+    }
+
+    @Override
+    public void init() {
+        List<T> events = bootstrap();
+        populateQueue(events);
+
+    }
+
+    @Override
+    public void reconnect() throws FalconException {
+        //Do Nothing
+    }
+
+    private void beforeRetry(T event) {
+        File retryFile = getRetryFile(serializeFilePath, event);
+        try {
+            BufferedWriter out = new BufferedWriter(new FileWriter(retryFile,
+                    true));
+            out.write(event.toString());
+            out.newLine();
+            out.close();
+        } catch (IOException e) {
+            LOG.warn(
+                    "Unable to write entry for process-instance: "
+                            + event.getEntityName() + ":"
+                            + event.getInstance(), e);
+        }
+    }
+
+    private File getRetryFile(File basePath, T event) {
+        return new File(basePath, (event.getType().name()) + "-"
+                + event.getEntityName() + "-"
+                + event.getInstance().replaceAll(":", "-"));
+    }
+
+    private void afterRetry(T event) {
+        File retryFile = getRetryFile(serializeFilePath, event);
+        if (!retryFile.exists()) {
+            LOG.warn("Rerun file deleted or renamed for process-instance: "
+                    + event.getEntityName() + ":" + event.getInstance());
+            GenericAlert.alertRetryFailed(event.getEntityType(),
+                    event.getEntityName(), event.getInstance(),
+                    event.getWfId(), Integer.toString(event.getRunId()),
+                    "Rerun file deleted or renamed for process-instance:");
+        } else {
+            if (!retryFile.delete()) {
+                LOG.warn("Unable to remove rerun file " + event.getWfId());
+                retryFile.deleteOnExit();
+            }
+        }
+    }
+
+    private List<T> bootstrap() {
+        List<T> rerunEvents = new ArrayList<T>();
+        for (File rerunFile : this.serializeFilePath.listFiles()) {
+            try {
+                BufferedReader reader = new BufferedReader(new FileReader(
+                        rerunFile));
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    line.split("");
+                    T event = new RerunEventFactory<T>().getRerunEvent(
+                            rerunFile.getName(), line);
+                    rerunEvents.add(event);
+                }
+            } catch (Exception e) {
+                LOG.warn(
+                        "Not able to read rerun entry "
+                                + rerunFile.getAbsolutePath(), e);
+            }
+        }
+        return rerunEvents;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index d446a06..f8edfbc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -17,8 +17,6 @@
  */
 package org.apache.falcon.rerun.service;
 
-import java.io.File;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.event.RerunEvent.RerunType;
@@ -29,42 +27,44 @@ import org.apache.falcon.service.FalconService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.log4j.Logger;
 
+import java.io.File;
+
 public class LateRunService implements FalconService {
 
-	private static final Logger LOG = Logger.getLogger(LateRunService.class);
+    private static final Logger LOG = Logger.getLogger(LateRunService.class);
 
-	@Override
-	public String getName() {
-		return LateRunService.class.getName();
-	}
+    @Override
+    public String getName() {
+        return LateRunService.class.getName();
+    }
 
-	@Override
-	public void init() throws FalconException {
-		AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = RerunHandlerFactory
-				.getRerunHandler(RerunType.LATE);
-		ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>(
-				StartupProperties
-						.get()
-						.getProperty("broker.url",
-								"failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
-				"falcon.late.queue");
-		rerunHandler.init(queue);
-	}
+    @Override
+    public void init() throws FalconException {
+        AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = RerunHandlerFactory
+                .getRerunHandler(RerunType.LATE);
+        ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>(
+                StartupProperties
+                        .get()
+                        .getProperty("broker.url",
+                                "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
+                "falcon.late.queue");
+        rerunHandler.init(queue);
+    }
 
-	@Override
-	public void destroy() throws FalconException {
-		LOG.info("LateRun  thread destroyed");
-	}
+    @Override
+    public void destroy() throws FalconException {
+        LOG.info("LateRun  thread destroyed");
+    }
 
-	private File getBasePath() {
-		File basePath = new File(StartupProperties.get().getProperty(
-				"rerun.recorder.path", "/tmp/falcon/rerun"));
-		if ((!basePath.exists() && !basePath.mkdirs())
-				|| (basePath.exists() && !basePath.canWrite())) {
-			throw new RuntimeException("Unable to initialize late recorder @"
-					+ basePath);
-		}
-		return basePath;
-	}
+    private File getBasePath() {
+        File basePath = new File(StartupProperties.get().getProperty(
+                "rerun.recorder.path", "/tmp/falcon/rerun"));
+        if ((!basePath.exists() && !basePath.mkdirs())
+                || (basePath.exists() && !basePath.canWrite())) {
+            throw new RuntimeException("Unable to initialize late recorder @"
+                    + basePath);
+        }
+        return basePath;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
index f0d0939..8a902be 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
@@ -17,8 +17,6 @@
  */
 package org.apache.falcon.rerun.service;
 
-import java.io.File;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.event.RetryEvent;
@@ -30,38 +28,40 @@ import org.apache.falcon.service.FalconService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.log4j.Logger;
 
+import java.io.File;
+
 public class RetryService implements FalconService {
 
-	private static final Logger LOG = Logger.getLogger(RetryService.class);
+    private static final Logger LOG = Logger.getLogger(RetryService.class);
 
-	@Override
-	public String getName() {
-		return "Falcon Retry failed Instance";
-	}
+    @Override
+    public String getName() {
+        return "Falcon Retry failed Instance";
+    }
 
-	@Override
-	public void init() throws FalconException {
-		AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler = RerunHandlerFactory
-				.getRerunHandler(RerunType.RETRY);
-		 InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(
-		 getBasePath());
-		rerunHandler.init(queue);
-	}
+    @Override
+    public void init() throws FalconException {
+        AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler = RerunHandlerFactory
+                .getRerunHandler(RerunType.RETRY);
+        InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(
+                getBasePath());
+        rerunHandler.init(queue);
+    }
 
-	@Override
-	public void destroy() throws FalconException {
-		LOG.info("RetryHandler  thread destroyed");
-	}
+    @Override
+    public void destroy() throws FalconException {
+        LOG.info("RetryHandler  thread destroyed");
+    }
 
-	private File getBasePath() {
-		File basePath = new File(StartupProperties.get().getProperty(
-				"retry.recorder.path", "/tmp/falcon/retry"));
-		if ((!basePath.exists() && !basePath.mkdirs())
-				|| (basePath.exists() && !basePath.canWrite())) {
-			throw new RuntimeException("Unable to initialize retry recorder @"
-					+ basePath);
-		}
-		return basePath;
-	}
+    private File getBasePath() {
+        File basePath = new File(StartupProperties.get().getProperty(
+                "retry.recorder.path", "/tmp/falcon/retry"));
+        if ((!basePath.exists() && !basePath.mkdirs())
+                || (basePath.exists() && !basePath.canWrite())) {
+            throw new RuntimeException("Unable to initialize retry recorder @"
+                    + basePath);
+        }
+        return basePath;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
index 494ddc6..be74032 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.falcon.rerun;
 
-import java.util.Date;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
@@ -27,67 +25,69 @@ import org.apache.falcon.rerun.policy.PeriodicPolicy;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Date;
+
 public class AbstractRerunPolicyTest {
 
-	@Test
-	public void TestGetDurationInMillis() throws FalconException {
-		AbstractRerunPolicy policy = new AbstractRerunPolicy() {
+    @Test
+    public void TestGetDurationInMillis() throws FalconException {
+        AbstractRerunPolicy policy = new AbstractRerunPolicy() {
+
+            @Override
+            public long getDelay(Frequency delay, Date nominaltime,
+                                 Date cutOffTime) throws FalconException {
+                // TODO Auto-generated method stub
+                return 0;
+            }
 
-			@Override
-			public long getDelay(Frequency delay, Date nominaltime,
-					Date cutOffTime) throws FalconException {
-				// TODO Auto-generated method stub
-				return 0;
-			}
+            @Override
+            public long getDelay(Frequency delay, int eventNumber)
+                    throws FalconException {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+        };
 
-			@Override
-			public long getDelay(Frequency delay, int eventNumber)
-					throws FalconException {
-				// TODO Auto-generated method stub
-				return 0;
-			}
-		};
+        Frequency frequency = new Frequency("minutes(1)");
+        Assert.assertEquals(policy.getDurationInMilliSec(frequency), 60000);
+        frequency = new Frequency("minutes(15)");
+        Assert.assertEquals(policy.getDurationInMilliSec(frequency), 900000);
+        frequency = new Frequency("hours(2)");
+        Assert.assertEquals(policy.getDurationInMilliSec(frequency), 7200000);
+    }
 
-		Frequency frequency = new Frequency("minutes(1)");
-		Assert.assertEquals(policy.getDurationInMilliSec(frequency), 60000);
-		frequency = new Frequency("minutes(15)");
-		Assert.assertEquals(policy.getDurationInMilliSec(frequency), 900000);
-		frequency = new Frequency("hours(2)");
-		Assert.assertEquals(policy.getDurationInMilliSec(frequency), 7200000);
-	}
+    @Test
+    public void TestExpBackoffPolicy() throws FalconException {
+        AbstractRerunPolicy backoff = new ExpBackoffPolicy();
+        long delay = backoff.getDelay(new Frequency("minutes(2)"), 2);
+        Assert.assertEquals(delay, 480000);
 
-	@Test
-	public void TestExpBackoffPolicy() throws FalconException {
-		AbstractRerunPolicy backoff = new ExpBackoffPolicy();
-		long delay = backoff.getDelay(new Frequency("minutes(2)"), 2);
-		Assert.assertEquals(delay, 480000);
+        long currentTime = System.currentTimeMillis();
+        delay = backoff.getDelay(new Frequency("minutes(2)"), new Date(
+                currentTime - 1 * 4 * 60 * 1000), new Date(currentTime + 1 * 60
+                * 60 * 1000));
+        Assert.assertEquals(delay, 1 * 6 * 60 * 1000);
 
-		long currentTime = System.currentTimeMillis();
-		delay = backoff.getDelay(new Frequency("minutes(2)"), new Date(
-				currentTime - 1 * 4 * 60 * 1000), new Date(currentTime + 1 * 60
-				* 60 * 1000));
-		Assert.assertEquals(delay, 1 * 6 * 60 * 1000);
-		
-		currentTime = System.currentTimeMillis();
-		delay = backoff.getDelay(new Frequency("minutes(1)"), new Date(
-				currentTime - 1 * 9 * 60 * 1000), new Date(currentTime + 1 * 60
-				* 60 * 1000));
-		Assert.assertEquals(delay, 900000);
-	}
+        currentTime = System.currentTimeMillis();
+        delay = backoff.getDelay(new Frequency("minutes(1)"), new Date(
+                currentTime - 1 * 9 * 60 * 1000), new Date(currentTime + 1 * 60
+                * 60 * 1000));
+        Assert.assertEquals(delay, 900000);
+    }
 
-	@Test
-	public void TestPeriodicPolicy() throws FalconException, InterruptedException {
-		AbstractRerunPolicy periodic = new PeriodicPolicy();
-		long delay = periodic.getDelay(new Frequency("minutes(2)"), 2);
-		Assert.assertEquals(delay, 120000);
-		delay = periodic.getDelay(new Frequency("minutes(2)"), 5);
-		Assert.assertEquals(delay, 120000);
+    @Test
+    public void TestPeriodicPolicy() throws FalconException, InterruptedException {
+        AbstractRerunPolicy periodic = new PeriodicPolicy();
+        long delay = periodic.getDelay(new Frequency("minutes(2)"), 2);
+        Assert.assertEquals(delay, 120000);
+        delay = periodic.getDelay(new Frequency("minutes(2)"), 5);
+        Assert.assertEquals(delay, 120000);
 
-		long currentTime = System.currentTimeMillis();
-		//Thread.sleep(1000);
-		delay = periodic.getDelay(new Frequency("minutes(3)"), new Date(
-				currentTime), new Date(currentTime + 1 * 60
-				* 60 * 1000));
-		Assert.assertEquals(delay, 180000);
-	}
+        long currentTime = System.currentTimeMillis();
+        //Thread.sleep(1000);
+        delay = periodic.getDelay(new Frequency("minutes(3)"), new Date(
+                currentTime), new Date(currentTime + 1 * 60
+                * 60 * 1000));
+        Assert.assertEquals(delay, 180000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
index b29f830..6028e10 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.rerun.handler;
 
 import com.sun.jersey.api.client.WebResource;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -30,6 +29,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeClass;
 
@@ -40,32 +40,33 @@ import java.io.StringWriter;
 import java.util.Collection;
 
 public class TestLateData {
-	
-	protected static final String FEED_XML = "/feed-template.xml";
+
+    protected static final String FEED_XML = "/feed-template.xml";
     protected static String CLUSTER_XML = "/cluster-template.xml";
     protected static final String PROCESS_XML = "/process-template.xml";
     protected static final String PROCESS_XML2 = "/process-template2.xml";
-    
+
     protected WebResource service = null;
     protected Configuration conf = new Configuration();
 
     @BeforeClass
     public void initConfigStore() throws Exception {
-    	MockitoAnnotations.initMocks(this);
+        MockitoAnnotations.initMocks(this);
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
-        StartupProperties.get().setProperty("configstore.listeners", 
+        StartupProperties.get().setProperty("configstore.listeners",
                 listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""));
         ConfigurationStore.get().init();
-        
+
     }
-    
+
     protected void cleanupStore() throws FalconException {
         ConfigurationStore store = ConfigurationStore.get();
-        for(EntityType type:EntityType.values()) {
+        for (EntityType type : EntityType.values()) {
             Collection<String> entities = store.getEntities(type);
-            for(String entity:entities)
+            for (String entity : entities) {
                 store.remove(type, entity);
+            }
         }
     }
 
@@ -73,8 +74,8 @@ public class TestLateData {
         Unmarshaller unmarshaller = type.getUnmarshaller();
         ConfigurationStore store = ConfigurationStore.get();
         store.remove(type, name);
-		switch (type) {
-		case CLUSTER:
+        switch (type) {
+            case CLUSTER:
                 Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
                 cluster.setName(name);
                 ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
@@ -96,14 +97,14 @@ public class TestLateData {
     }
 
     public void setup() throws Exception {
-		ConfigurationStore store = ConfigurationStore.get();
-		for (EntityType type : EntityType.values()) {
-			for (String name : store.getEntities(type)) {
-				store.remove(type, name);
-			}
-		}
-		storeEntity(EntityType.CLUSTER , "testCluster");
-		storeEntity(EntityType.PROCESS, "sample");
+        ConfigurationStore store = ConfigurationStore.get();
+        for (EntityType type : EntityType.values()) {
+            for (String name : store.getEntities(type)) {
+                store.remove(type, name);
+            }
+        }
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.PROCESS, "sample");
         storeEntity(EntityType.FEED, "raw-logs");
         storeEntity(EntityType.FEED, "clicks");
         Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller();
@@ -112,14 +113,14 @@ public class TestLateData {
         store.publish(EntityType.PROCESS, process);
     }
 
-	public String marshallEntity(final Entity entity) throws FalconException,
-			JAXBException {
-		Marshaller marshaller = entity.getEntityType().getMarshaller();
-		StringWriter stringWriter = new StringWriter();
-		marshaller.marshal(entity, stringWriter);
-		return stringWriter.toString();
-	}
-	
+    public String marshallEntity(final Entity entity) throws FalconException,
+                                                             JAXBException {
+        Marshaller marshaller = entity.getEntityType().getMarshaller();
+        StringWriter stringWriter = new StringWriter();
+        marshaller.marshal(entity, stringWriter);
+        return stringWriter.toString();
+    }
+
 //	@Test
 //	private void TestLateWhenInstanceRunning() throws Exception
 //	{
@@ -189,5 +190,5 @@ public class TestLateData {
 //		Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
 //
 //	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 0990b57..a13aa02 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -18,7 +18,6 @@
 package org.apache.falcon.rerun.queue;
 
 import junit.framework.Assert;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.event.RerunEvent;
@@ -27,39 +26,39 @@ import org.testng.annotations.Test;
 
 public class ActiveMQTest {
 
-	private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-	private BrokerService broker;
-	private static final String DESTI = "activemq.topic";
+    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+    private BrokerService broker;
+    private static final String DESTI = "activemq.topic";
+
+    @BeforeClass
+    private void setup() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target/activemq");
+        broker.addConnector(BROKER_URL);
+        broker.setBrokerName("localhost");
+        broker.setSchedulerSupport(true);
+        broker.start();
+    }
+
+    @Test
+    public void testBrokerStartAndEnqueue() {
+        ActiveMQueue<RerunEvent> activeMQueue = new ActiveMQueue<RerunEvent>(
+                BROKER_URL, DESTI);
+        activeMQueue.init();
+        RerunEvent event = new LaterunEvent("clusterName", "wfId",
+                System.currentTimeMillis(), 60 * 1000, "entityType",
+                "entityName", "instance", 0);
 
-	@BeforeClass
-	private void setup() throws Exception {
-		broker = new BrokerService();
-		broker.setDataDirectory("target/activemq");
-		broker.addConnector(BROKER_URL);
-		broker.setBrokerName("localhost");
-		broker.setSchedulerSupport(true);
-		broker.start();
-	}
+        try {
+            activeMQueue.offer(event);
+            broker.stop();
+            broker.start();
+            activeMQueue.reconnect();
+            activeMQueue.offer(event);
+        } catch (Exception e) {
+            Assert.fail();
+        }
 
-	@Test
-	public void testBrokerStartAndEnqueue() {
-		ActiveMQueue<RerunEvent> activeMQueue = new ActiveMQueue<RerunEvent>(
-				BROKER_URL, DESTI);
-		activeMQueue.init();
-		RerunEvent event = new LaterunEvent("clusterName", "wfId",
-				System.currentTimeMillis(), 60 * 1000, "entityType",
-				"entityName", "instance", 0);
-	
-		try{
-		activeMQueue.offer(event);
-		broker.stop();
-		broker.start();
-		activeMQueue.reconnect();
-		activeMQueue.offer(event);
-		}catch(Exception e){
-			Assert.fail();
-		}
 
-	
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index 5bd35e5..d41453b 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -27,7 +27,7 @@ import java.util.LinkedList;
 
 public class InMemoryQueueTest {
 
-    @Test (timeOut = 10000)
+    @Test(timeOut = 10000)
     public void testDelayedQueue() throws Exception {
         runTest();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/resources/cluster-template.xml
----------------------------------------------------------------------
diff --git a/rerun/src/test/resources/cluster-template.xml b/rerun/src/test/resources/cluster-template.xml
index 46a9805..658365f 100644
--- a/rerun/src/test/resources/cluster-template.xml
+++ b/rerun/src/test/resources/cluster-template.xml
@@ -16,21 +16,22 @@
   limitations under the License.
   -->
 
-<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-        <interfaces>
-                <interface type="readonly" endpoint="http://localhost:50070"
-                        version="0.20.2" />
-                <interface type="write" endpoint="hdfs://localhost:8020"                        version="0.20.2" />
-                <interface type="execute" endpoint="localhost:8021" version="0.20.2" />
-                <interface type="workflow" endpoint="http://localhost:11000/oozie/"
-                        version="3.1" />
-                <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
-                        version="5.1.6" />
-                <interface type="registry" endpoint="Hcat" version="1" />
-        </interfaces>
-        <locations>
-               <location name="temp" path="/tmp" />
-                <location name="working" path="/projects/falcon/working" />
-                  <location name="staging" path="/projects/falcon/staging" />
-         </locations>
- </cluster>
\ No newline at end of file
+<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020" version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging"/>
+    </locations>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/resources/feed-template.xml
----------------------------------------------------------------------
diff --git a/rerun/src/test/resources/feed-template.xml b/rerun/src/test/resources/feed-template.xml
index 6a277fe..c3f2834 100644
--- a/rerun/src/test/resources/feed-template.xml
+++ b/rerun/src/test/resources/feed-template.xml
@@ -17,27 +17,28 @@
   -->
 
 <feed description="clicks log" name="raw-logs" xmlns="uri:falcon:feed:0.1"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+        >
 
-	<frequency>minutes(20)</frequency>
+    <frequency>minutes(20)</frequency>
     <timezone>UTC</timezone>
-    
-	<late-arrival cut-off="minutes(6)" />
-	<clusters>
-		<cluster name="corp" type="source">
-			<validity start="2012-05-01T00:00Z" end="2012-12-31T23:59Z"/>
-			<retention limit="months(36)" action="delete" /> <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-		</cluster>
-	</clusters>
-
-	<locations>
-		<location type="data"
-			path="/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}" />
-		<location type="stats" path="/projects/falcon/clicksStats" />
-		<location type="meta" path="/projects/falcon/clicksMetaData" />
-	</locations>
-
-	<ACL owner="testuser" group="group" permission="0x755" />
-	<schema location="/schema/clicks" provider="protobuf" />
+
+    <late-arrival cut-off="minutes(6)"/>
+    <clusters>
+        <cluster name="corp" type="source">
+            <validity start="2012-05-01T00:00Z" end="2012-12-31T23:59Z"/>
+            <retention limit="months(36)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data"
+                  path="/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
 
 </feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/resources/process-template.xml
----------------------------------------------------------------------
diff --git a/rerun/src/test/resources/process-template.xml b/rerun/src/test/resources/process-template.xml
index b7b4eb4..5a66586 100644
--- a/rerun/src/test/resources/process-template.xml
+++ b/rerun/src/test/resources/process-template.xml
@@ -17,10 +17,10 @@
   limitations under the License.
   -->
 
-<process name="sample" xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<process name="sample" xmlns="uri:falcon:process:0.1">
     <clusters>
         <cluster name="testCluster">
-            <validity end="2013-01-01T01:20Z" start="2010-01-01T01:00Z" />
+            <validity end="2013-01-01T01:20Z" start="2010-01-01T01:00Z"/>
         </cluster>
     </clusters>
     <concurrency>1</concurrency>
@@ -29,21 +29,21 @@
     <timezone>UTC</timezone>
 
     <inputs>
-        <input end-instance="today(20,20)" start-instance="today(0,-60)" feed="raw-logs" name="inputData" />
+        <input end-instance="today(20,20)" start-instance="today(0,-60)" feed="raw-logs" name="inputData"/>
     </inputs>
     <outputs>
-        <output instance="now(0,0)" feed="agg-logs" name="outputData" />
+        <output instance="now(0,0)" feed="agg-logs" name="outputData"/>
     </outputs>
     <properties>
-        <property name="queueName" value="default" />
-        <property name="field1" value="value1" />
-        <property name="field3" value="value3" />
-        <property name="field7" value="value7" />
+        <property name="queueName" value="default"/>
+        <property name="field1" value="value1"/>
+        <property name="field3" value="value3"/>
+        <property name="field7" value="value7"/>
     </properties>
-    <workflow path="/examples/apps/aggregator" />
-    <retry policy="exp-backoff" delay="minutes(2)" attempts="2" />
+    <workflow path="/examples/apps/aggregator"/>
+    <retry policy="exp-backoff" delay="minutes(2)" attempts="2"/>
 
     <late-process policy="periodic" delay="minutes(1)">
-        <late-input feed="inputData" workflow-path="hdfs://impression/late/workflow" />
+        <late-input feed="inputData" workflow-path="hdfs://impression/late/workflow"/>
     </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/test/resources/process-template2.xml
----------------------------------------------------------------------
diff --git a/rerun/src/test/resources/process-template2.xml b/rerun/src/test/resources/process-template2.xml
index 014f0e6..bbdcfe0 100644
--- a/rerun/src/test/resources/process-template2.xml
+++ b/rerun/src/test/resources/process-template2.xml
@@ -17,36 +17,36 @@
   limitations under the License.
   -->
 
-<process name="sample2" xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<process name="sample2" xmlns="uri:falcon:process:0.1">
     <clusters>
         <cluster name="testCluster">
-            <validity end="2013-01-01T01:20Z" start="2010-01-01T01:00Z" />
+            <validity end="2013-01-01T01:20Z" start="2010-01-01T01:00Z"/>
         </cluster>
     </clusters>
     <concurrency>1</concurrency>
     <execution>FIFO</execution>
     <frequency>minutes(5)</frequency>
     <timezone>UTC</timezone>
-	<inputs>
-		<input end-instance="yesterday(4,20)" start-instance="yesterday(0,0)"
-			feed="raw-logs" name="inputData" />
-		<input end-instance="yesterday(16,0)" start-instance="yesterday(12,0)"
-			feed="raw-logs" name="inputData2" />
-	</inputs>
-	<outputs>
-		<output instance="now(0,0)" feed="agg-logs" name="outputData" />
-	</outputs>
-	<properties>
-		<property name="queueName" value="default" />
-		<property name="field1" value="value1" />
-		<property name="field3" value="value3" />
-		<property name="field7" value="value7" />
-	</properties>
-	<workflow path="/examples/apps/aggregator" />
-	<retry policy="exp-backoff" delay="minutes(2)"	attempts="2" />
+    <inputs>
+        <input end-instance="yesterday(4,20)" start-instance="yesterday(0,0)"
+               feed="raw-logs" name="inputData"/>
+        <input end-instance="yesterday(16,0)" start-instance="yesterday(12,0)"
+               feed="raw-logs" name="inputData2"/>
+    </inputs>
+    <outputs>
+        <output instance="now(0,0)" feed="agg-logs" name="outputData"/>
+    </outputs>
+    <properties>
+        <property name="queueName" value="default"/>
+        <property name="field1" value="value1"/>
+        <property name="field3" value="value3"/>
+        <property name="field7" value="value7"/>
+    </properties>
+    <workflow path="/examples/apps/aggregator"/>
+    <retry policy="exp-backoff" delay="minutes(2)" attempts="2"/>
 
-	<late-process policy="exp-backoff" delay="hours(1)">
-		<late-input feed="inputData" workflow-path="hdfs://impression/late/workflow" />
-		<late-input feed="inputData2" workflow-path="hdfs://impression/late/workflow" />
-	</late-process>
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input feed="inputData" workflow-path="hdfs://impression/late/workflow"/>
+        <late-input feed="inputData2" workflow-path="hdfs://impression/late/workflow"/>
+    </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index dedb945..14c2c1c 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -18,33 +18,15 @@
 
 package org.apache.falcon.retention;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.el.ExpressionEvaluatorImpl;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.common.FeedDataPath.VARS;
+import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,12 +35,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.common.FeedDataPath.VARS;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.log4j.Logger;
 
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+import java.io.*;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Feed Evictor is called only if the retention policy that applies
  * to the feed is that of delete.
@@ -91,73 +79,73 @@ public class FeedEvictor extends Configured implements Tool {
 
     private FileSystem fs;
     private Map<VARS, String> map = new TreeMap<VARS, String>();
-	private final StringBuffer instancePaths = new StringBuffer("instancePaths=");
-	private final StringBuffer buffer = new StringBuffer();
+    private final StringBuffer instancePaths = new StringBuffer("instancePaths=");
+    private final StringBuffer buffer = new StringBuffer();
 
     @Override
     public int run(String[] args) throws Exception {
-        
-    	CommandLine cmd = getCommand(args);
-        String feedBasePath = cmd.getOptionValue("feedBasePath").replaceAll("\\?\\{", "\\$\\{");        
+
+        CommandLine cmd = getCommand(args);
+        String feedBasePath = cmd.getOptionValue("feedBasePath").replaceAll("\\?\\{", "\\$\\{");
         String retentionType = cmd.getOptionValue("retentionType");
         String retentionLimit = cmd.getOptionValue("retentionLimit");
         String timeZone = cmd.getOptionValue("timeZone");
         String frequency = cmd.getOptionValue("frequency"); //to write out smart path filters
-        String logFile=cmd.getOptionValue("logFile");
+        String logFile = cmd.getOptionValue("logFile");
 
-        String []feedLocs = feedBasePath.split("#");
-        for(String path: feedLocs){
-        	evictor(path, retentionType, retentionLimit, timeZone, frequency, logFile);
+        String[] feedLocs = feedBasePath.split("#");
+        for (String path : feedLocs) {
+            evictor(path, retentionType, retentionLimit, timeZone, frequency, logFile);
         }
 
-		logInstancePaths(new Path(logFile), instancePaths.toString());
-		int len = buffer.length();
-		if (len > 0) {
-			stream.println("instances=" + buffer.substring(0, len - 1));
-		} else {
-			stream.println("instances=NULL");
-		}
+        logInstancePaths(new Path(logFile), instancePaths.toString());
+        int len = buffer.length();
+        if (len > 0) {
+            stream.println("instances=" + buffer.substring(0, len - 1));
+        } else {
+            stream.println("instances=NULL");
+        }
         return 0;
     }
-    
-	private void evictor(String feedBasePath, String retentionType,
-			String retentionLimit, String timeZone, String frequency,
-			String logFile) throws IOException, ELException {
-		Path normalizedPath = new Path(feedBasePath);
-		fs = normalizedPath.getFileSystem(getConf());
-		feedBasePath = normalizedPath.toUri().getPath();
-		LOG.info("Normalized path : " + feedBasePath);
-		Pair<Date, Date> range = getDateRange(retentionLimit);
-		String dateMask = getDateFormatInPath(feedBasePath);
-		List<Path> toBeDeleted = discoverInstanceToDelete(feedBasePath,
-				timeZone, dateMask, range.first);
-
-		LOG.info("Applying retention on " + feedBasePath + " type: "
-				+ retentionType + ", Limit: " + retentionLimit + ", timezone: "
-				+ timeZone + ", frequency: " + frequency);
-
-		DateFormat dateFormat = new SimpleDateFormat(format);
-		dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-		for (Path path : toBeDeleted) {
-			if (deleteInstance(path)) {
-				LOG.info("Deleted instance " + path);
-				Date date = getDate(path, feedBasePath, dateMask, timeZone);
-				buffer.append(dateFormat.format(date)).append(',');
-				instancePaths.append(path).append(",");
-			}
-		}
-
-	}
+
+    private void evictor(String feedBasePath, String retentionType,
+                         String retentionLimit, String timeZone, String frequency,
+                         String logFile) throws IOException, ELException {
+        Path normalizedPath = new Path(feedBasePath);
+        fs = normalizedPath.getFileSystem(getConf());
+        feedBasePath = normalizedPath.toUri().getPath();
+        LOG.info("Normalized path : " + feedBasePath);
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+        String dateMask = getDateFormatInPath(feedBasePath);
+        List<Path> toBeDeleted = discoverInstanceToDelete(feedBasePath,
+                timeZone, dateMask, range.first);
+
+        LOG.info("Applying retention on " + feedBasePath + " type: "
+                + retentionType + ", Limit: " + retentionLimit + ", timezone: "
+                + timeZone + ", frequency: " + frequency);
+
+        DateFormat dateFormat = new SimpleDateFormat(format);
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        for (Path path : toBeDeleted) {
+            if (deleteInstance(path)) {
+                LOG.info("Deleted instance " + path);
+                Date date = getDate(path, feedBasePath, dateMask, timeZone);
+                buffer.append(dateFormat.format(date)).append(',');
+                instancePaths.append(path).append(",");
+            }
+        }
+
+    }
 
     private void logInstancePaths(Path path, String instancePaths) throws IOException {
-		LOG.info("Writing deleted instances to path " + path);
-		OutputStream out = fs.create(path);
-		out.write(instancePaths.getBytes());
-		out.close();
-		if(LOG.isDebugEnabled()){
-			debug(path, fs);
-		}
-	}
+        LOG.info("Writing deleted instances to path " + path);
+        OutputStream out = fs.create(path);
+        out.write(instancePaths.getBytes());
+        out.close();
+        if (LOG.isDebugEnabled()) {
+            debug(path, fs);
+        }
+    }
 
     private Pair<Date, Date> getDateRange(String period) throws ELException {
         Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
@@ -279,42 +267,42 @@ public class FeedEvictor extends Configured implements Tool {
     private boolean deleteInstance(Path path) throws IOException {
         return fs.delete(path, true);
     }
-    
-	private void debug(Path outPath, FileSystem fs) throws IOException {
-		ByteArrayOutputStream writer = new ByteArrayOutputStream();
-		InputStream instance = fs.open(outPath);
-		IOUtils.copyBytes(instance, writer, 4096, true);
-		LOG.debug("Instance Paths copied to " + outPath );
-		LOG.debug("Written "+writer);
-	}
-	
-	private CommandLine getCommand(String[] args)
-			throws org.apache.commons.cli.ParseException {
-		Options options = new Options();
-		Option opt;
-		opt = new Option("feedBasePath", true,
-				"base path for feed, ex /data/feed/${YEAR}-${MONTH}");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("retentionType", true,
-				"type of retention policy like delete, archive etc");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("retentionLimit", true,
-				"time limit for retention, ex hours(5), months(2), days(90)");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("timeZone", true, "timezone for feed, ex UTC");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("frequency", true,
-				"frequency of feed,  ex hourly, daily, monthly, minute, weekly, yearly");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("logFile", true, "log file for capturing size of feed");
-		opt.setRequired(true);
-		options.addOption(opt);
-		return new GnuParser().parse(options, args);
-	}
+
+    private void debug(Path outPath, FileSystem fs) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(outPath);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        LOG.debug("Instance Paths copied to " + outPath);
+        LOG.debug("Written " + writer);
+    }
+
+    private CommandLine getCommand(String[] args)
+            throws org.apache.commons.cli.ParseException {
+        Options options = new Options();
+        Option opt;
+        opt = new Option("feedBasePath", true,
+                "base path for feed, ex /data/feed/${YEAR}-${MONTH}");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("retentionType", true,
+                "type of retention policy like delete, archive etc");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("retentionLimit", true,
+                "time limit for retention, ex hours(5), months(2), days(90)");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("timeZone", true, "timezone for feed, ex UTC");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("frequency", true,
+                "frequency of feed,  ex hourly, daily, monthly, minute, weekly, yearly");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("logFile", true, "log file for capturing size of feed");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return new GnuParser().parse(options, args);
+    }
 
 }


Mime
View raw message