falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [18/47] Fixes for Checkstyle
Date Fri, 26 Apr 2013 15:50:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 65849f3..f0ef515 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -18,26 +18,9 @@
 
 package org.apache.falcon.entity;
 
-import java.lang.reflect.Method;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
 import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
@@ -49,57 +32,61 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Retry;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.fs.Path;
+
+import java.lang.reflect.Method;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
 
 public class EntityUtil {
-	private static final long MINUTE_IN_MS = 60000L;
-	private static final long HOUR_IN_MS = 3600000L;
-	private static final long DAY_IN_MS = 86400000L;
-	private static final long MONTH_IN_MS = 2592000000L;
-
-	public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
-		ConfigurationStore configStore = ConfigurationStore.get();
-		T entity = configStore.get(type, entityName);
-		if (entity == null) {
-			throw new EntityNotRegisteredException(entityName + " (" + type + ") not found");
-		}
-		return entity;        
-	}
-
-	public static <T extends Entity> T getEntity(String type, String entityName) throws FalconException {
-		EntityType entityType;
-		try {
-			entityType = EntityType.valueOf(type.toUpperCase());
-		} catch (IllegalArgumentException e) {
-			throw new FalconException("Invalid entity type: " + type, e);
-		}
-		return getEntity(entityType, entityName);
-	}
-
-	public static TimeZone getTimeZone(String tzId) {
-		if (tzId == null) {
-			throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
-		}
-		TimeZone tz = TimeZone.getTimeZone(tzId);
-		if (!tzId.equals("GMT") && tz.getID().equals("GMT")) {
-			throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
-		}
-		return tz;
-	}
-
-	public static Date getEndTime(Entity entity, String cluster) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			return getEndTime((Process) entity, cluster);
-		} else {
-			return getEndTime((Feed) entity, cluster);
-		}
-	}
+    private static final long MINUTE_IN_MS = 60000L;
+    private static final long HOUR_IN_MS = 3600000L;
+    private static final long DAY_IN_MS = 86400000L;
+    private static final long MONTH_IN_MS = 2592000000L;
+
+    public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
+        ConfigurationStore configStore = ConfigurationStore.get();
+        T entity = configStore.get(type, entityName);
+        if (entity == null) {
+            throw new EntityNotRegisteredException(entityName + " (" + type + ") not found");
+        }
+        return entity;
+    }
+
+    public static <T extends Entity> T getEntity(String type, String entityName) throws FalconException {
+        EntityType entityType;
+        try {
+            entityType = EntityType.valueOf(type.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new FalconException("Invalid entity type: " + type, e);
+        }
+        return getEntity(entityType, entityName);
+    }
+
+    public static TimeZone getTimeZone(String tzId) {
+        if (tzId == null) {
+            throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
+        }
+        TimeZone tz = TimeZone.getTimeZone(tzId);
+        if (!tzId.equals("GMT") && tz.getID().equals("GMT")) {
+            throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
+        }
+        return tz;
+    }
+
+    public static Date getEndTime(Entity entity, String cluster) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            return getEndTime((Process) entity, cluster);
+        } else {
+            return getEndTime((Feed) entity, cluster);
+        }
+    }
 
     public static Date parseDateUTC(String dateStr) throws FalconException {
         try {
@@ -109,100 +96,100 @@ public class EntityUtil {
         }
     }
 
-	public static Date getStartTime(Entity entity, String cluster) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			return getStartTime((Process) entity, cluster);
-		} else {
-			return getStartTime((Feed) entity, cluster);
-		}
-	}
-
-	public static Date getEndTime(Process process, String cluster) {
-		org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-		return processCluster.getValidity().getEnd();
-	}
-
-	public static Date getStartTime(Process process, String cluster) {
-		org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-		return processCluster.getValidity().getStart();
-	}
-
-	public static Date getEndTime(Feed feed, String cluster) {
-		org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
-		return clusterDef.getValidity().getEnd();
-	}
-
-	public static Date getStartTime(Feed feed, String cluster) {
-		org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
-		return clusterDef.getValidity().getStart();
-	}
-
-	public static int getParallel(Entity entity) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			return getParallel((Process) entity);
-		} else {
-			return getParallel((Feed) entity);
-		}
-	}
-
-	public static void setStartDate(Entity entity, String cluster, Date startDate) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			setStartDate((Process) entity, cluster, startDate);
-		} else {
-			setStartDate((Feed) entity, cluster, startDate);
-		}
-	}
-
-	public static void setEndTime(Entity entity, String cluster, Date endDate) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			setEndTime((Process) entity, cluster, endDate);
-		} else {
-			setEndTime((Feed) entity, cluster, endDate);
-		}
-	}
-
-	public static void setParallel(Entity entity, int parallel) {
-		if (entity.getEntityType() == EntityType.PROCESS) {
-			setParallel((Process) entity, parallel);
-		} else {
-			setParallel((Feed) entity, parallel);
-		}
-	}
-
-	public static int getParallel(Process process) {
-		return process.getParallel();
-	}
-
-	public static void setStartDate(Process process, String cluster, Date startDate) {
-		org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-		processCluster.getValidity().setStart(startDate);
-	}
-
-	public static void setParallel(Process process, int parallel) {
-		process.setParallel(parallel);
-	}
-
-	public static void setEndTime(Process process, String cluster, Date endDate) {
-		org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-		processCluster.getValidity().setEnd(endDate);
-	}
-
-	public static int getParallel(Feed feed) {
-		return 1;
-	}
-
-	public static void setStartDate(Feed feed, String cluster, Date startDate) {
-		org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
-		clusterDef.getValidity().setStart(startDate);
-	}
-
-	public static void setEndTime(Feed feed, String cluster, Date endDate) {
-		org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
-		clusterDef.getValidity().setStart(endDate);
-	}
-
-	public static void setParallel(Feed feed, int parallel) {
-	}
+    public static Date getStartTime(Entity entity, String cluster) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            return getStartTime((Process) entity, cluster);
+        } else {
+            return getStartTime((Feed) entity, cluster);
+        }
+    }
+
+    public static Date getEndTime(Process process, String cluster) {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        return processCluster.getValidity().getEnd();
+    }
+
+    public static Date getStartTime(Process process, String cluster) {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        return processCluster.getValidity().getStart();
+    }
+
+    public static Date getEndTime(Feed feed, String cluster) {
+        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
+        return clusterDef.getValidity().getEnd();
+    }
+
+    public static Date getStartTime(Feed feed, String cluster) {
+        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
+        return clusterDef.getValidity().getStart();
+    }
+
+    public static int getParallel(Entity entity) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            return getParallel((Process) entity);
+        } else {
+            return getParallel((Feed) entity);
+        }
+    }
+
+    public static void setStartDate(Entity entity, String cluster, Date startDate) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            setStartDate((Process) entity, cluster, startDate);
+        } else {
+            setStartDate((Feed) entity, cluster, startDate);
+        }
+    }
+
+    public static void setEndTime(Entity entity, String cluster, Date endDate) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            setEndTime((Process) entity, cluster, endDate);
+        } else {
+            setEndTime((Feed) entity, cluster, endDate);
+        }
+    }
+
+    public static void setParallel(Entity entity, int parallel) {
+        if (entity.getEntityType() == EntityType.PROCESS) {
+            setParallel((Process) entity, parallel);
+        } else {
+            setParallel((Feed) entity, parallel);
+        }
+    }
+
+    public static int getParallel(Process process) {
+        return process.getParallel();
+    }
+
+    public static void setStartDate(Process process, String cluster, Date startDate) {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        processCluster.getValidity().setStart(startDate);
+    }
+
+    public static void setParallel(Process process, int parallel) {
+        process.setParallel(parallel);
+    }
+
+    public static void setEndTime(Process process, String cluster, Date endDate) {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        processCluster.getValidity().setEnd(endDate);
+    }
+
+    public static int getParallel(Feed feed) {
+        return 1;
+    }
+
+    public static void setStartDate(Feed feed, String cluster, Date startDate) {
+        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
+        clusterDef.getValidity().setStart(startDate);
+    }
+
+    public static void setEndTime(Feed feed, String cluster, Date endDate) {
+        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
+        clusterDef.getValidity().setStart(endDate);
+    }
+
+    public static void setParallel(Feed feed, int parallel) {
+    }
 
     public static Frequency getFrequency(Entity entity) {
         if (entity.getEntityType() == EntityType.PROCESS) {
@@ -236,75 +223,79 @@ public class EntityUtil {
         return feed.getTimezone();
     }
 
-	public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date now) {
-		if (startTime.after(now))
-			return startTime;
-
-		Calendar startCal = Calendar.getInstance(timezone);
-		startCal.setTime(startTime);
-
-		int count = 0;
-		switch (frequency.getTimeUnit()) {
-		case months:
-			count = (int) ((now.getTime() - startTime.getTime()) / MONTH_IN_MS);
-			break;
-		case days:
-			count = (int) ((now.getTime() - startTime.getTime()) / DAY_IN_MS);
-			break;
-		case hours:
-			count = (int) ((now.getTime() - startTime.getTime()) / HOUR_IN_MS);
-			break;
-		case minutes:
-			count = (int) ((now.getTime() - startTime.getTime()) / MINUTE_IN_MS);
-			break;
-		default:
-		}
-
-		if (count > 2) {
-			startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / frequency.getFrequency()) * frequency.getFrequency());
-		}
-		while (startCal.getTime().before(now)) {
-			startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
-		}
-		return startCal.getTime();
-	}
-
-	public static int getInstanceSequence(Date startTime, Frequency frequency, TimeZone tz, Date instanceTime) {
-		if (startTime.after(instanceTime))
-			return -1;
-
-		Calendar startCal = Calendar.getInstance(tz);
-		startCal.setTime(startTime);
-
-		int count = 0;
-		switch (frequency.getTimeUnit()) {
-		case months:
-			count = (int) ((instanceTime.getTime() - startTime.getTime()) / MONTH_IN_MS);
-			break;
-		case days:
-			count = (int) ((instanceTime.getTime() - startTime.getTime()) / DAY_IN_MS);
-			break;
-		case hours:
-			count = (int) ((instanceTime.getTime() - startTime.getTime()) / HOUR_IN_MS);
-			break;
-		case minutes:
-			count = (int) ((instanceTime.getTime() - startTime.getTime()) / MINUTE_IN_MS);
-			break;
-		default:
-		}
-
-		if (count > 2) {
-			startCal.add(frequency.getTimeUnit().getCalendarUnit(), (count / frequency.getFrequency()) * frequency.getFrequency());
-			count = (count / frequency.getFrequency());
-		} else {
+    public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date now) {
+        if (startTime.after(now)) {
+            return startTime;
+        }
+
+        Calendar startCal = Calendar.getInstance(timezone);
+        startCal.setTime(startTime);
+
+        int count = 0;
+        switch (frequency.getTimeUnit()) {
+            case months:
+                count = (int) ((now.getTime() - startTime.getTime()) / MONTH_IN_MS);
+                break;
+            case days:
+                count = (int) ((now.getTime() - startTime.getTime()) / DAY_IN_MS);
+                break;
+            case hours:
+                count = (int) ((now.getTime() - startTime.getTime()) / HOUR_IN_MS);
+                break;
+            case minutes:
+                count = (int) ((now.getTime() - startTime.getTime()) / MINUTE_IN_MS);
+                break;
+            default:
+        }
+
+        if (count > 2) {
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(),
+                    ((count - 2) / frequency.getFrequency()) * frequency.getFrequency());
+        }
+        while (startCal.getTime().before(now)) {
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+        }
+        return startCal.getTime();
+    }
+
+    public static int getInstanceSequence(Date startTime, Frequency frequency, TimeZone tz, Date instanceTime) {
+        if (startTime.after(instanceTime)) {
+            return -1;
+        }
+
+        Calendar startCal = Calendar.getInstance(tz);
+        startCal.setTime(startTime);
+
+        int count = 0;
+        switch (frequency.getTimeUnit()) {
+            case months:
+                count = (int) ((instanceTime.getTime() - startTime.getTime()) / MONTH_IN_MS);
+                break;
+            case days:
+                count = (int) ((instanceTime.getTime() - startTime.getTime()) / DAY_IN_MS);
+                break;
+            case hours:
+                count = (int) ((instanceTime.getTime() - startTime.getTime()) / HOUR_IN_MS);
+                break;
+            case minutes:
+                count = (int) ((instanceTime.getTime() - startTime.getTime()) / MINUTE_IN_MS);
+                break;
+            default:
+        }
+
+        if (count > 2) {
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(),
+                    (count / frequency.getFrequency()) * frequency.getFrequency());
+            count = (count / frequency.getFrequency());
+        } else {
             count = 0;
         }
-		while (startCal.getTime().before(instanceTime)) {
-			startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
-			count++;
-		}
-		return count + 1;
-	}
+        while (startCal.getTime().before(instanceTime)) {
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+            count++;
+        }
+        return count + 1;
+    }
 
     public static String md5(Entity entity) throws FalconException {
         return new String(Hex.encodeHex(DigestUtils.md5(stringOf(entity))));
@@ -315,10 +306,12 @@ public class EntityUtil {
     }
 
     public static boolean equals(Entity lhs, Entity rhs, String[] filterProps) throws FalconException {
-        if (lhs == null && rhs == null)
+        if (lhs == null && rhs == null) {
             return true;
-        if (lhs == null || rhs == null)
+        }
+        if (lhs == null || rhs == null) {
             return false;
+        }
 
         if (lhs.equals(rhs)) {
             String lhsString = stringOf(lhs, filterProps);
@@ -332,38 +325,43 @@ public class EntityUtil {
     public static String stringOf(Entity entity) throws FalconException {
         return stringOf(entity, null);
     }
-    
+
     private static String stringOf(Entity entity, String[] filterProps) throws FalconException {
         Map<String, String> map = new HashMap<String, String>();
         mapToProperties(entity, null, map, filterProps);
         List<String> keyList = new ArrayList<String>(map.keySet());
         Collections.sort(keyList);
         StringBuilder builer = new StringBuilder();
-        for (String key : keyList)
+        for (String key : keyList) {
             builer.append(key).append('=').append(map.get(key)).append('\n');
+        }
         return builer.toString();
     }
 
     @SuppressWarnings("rawtypes")
-    private static void mapToProperties(Object obj, String name, Map<String, String> propMap, String[] filterProps) throws FalconException {
-        if (obj == null)
+    private static void mapToProperties(Object obj, String name, Map<String, String> propMap, String[] filterProps)
+            throws FalconException {
+        if (obj == null) {
             return;
+        }
 
-        if (filterProps != null && name != null)
+        if (filterProps != null && name != null) {
             for (String filter : filterProps) {
-                if (name.matches(filter.replace(".", "\\.").replace("[", "\\[").replace("]", "\\]")))
+                if (name.matches(filter.replace(".", "\\.").replace("[", "\\[").replace("]", "\\]"))) {
                     return;
+                }
             }
+        }
 
-        if (Date.class.isAssignableFrom(obj.getClass()))
-            propMap.put(name, SchemaHelper.formatDateUTC((Date)obj));
-        else if (obj.getClass().getPackage().getName().equals("java.lang"))
+        if (Date.class.isAssignableFrom(obj.getClass())) {
+            propMap.put(name, SchemaHelper.formatDateUTC((Date) obj));
+        } else if (obj.getClass().getPackage().getName().equals("java.lang")) {
             propMap.put(name, String.valueOf(obj));
-        else if (TimeZone.class.isAssignableFrom(obj.getClass()))
+        } else if (TimeZone.class.isAssignableFrom(obj.getClass())) {
             propMap.put(name, ((TimeZone) obj).getID());
-        else if (Enum.class.isAssignableFrom(obj.getClass()))
+        } else if (Enum.class.isAssignableFrom(obj.getClass())) {
             propMap.put(name, ((Enum) obj).name());
-        else if (List.class.isAssignableFrom(obj.getClass())) {
+        } else if (List.class.isAssignableFrom(obj.getClass())) {
             List list = (List) obj;
             for (int index = 0; index < list.size(); index++) {
                 mapToProperties(list.get(index), name + "[" + index + "]", propMap, filterProps);
@@ -376,206 +374,217 @@ public class EntityUtil {
                 try {
                     Map map = PropertyUtils.describe(obj);
                     for (Object key : map.keySet()) {
-                        if (!key.equals("class"))
-                            mapToProperties(map.get(key), name != null ? name + "." + key : (String)key, propMap, filterProps);
+                        if (!key.equals("class")) {
+                            mapToProperties(map.get(key), name != null ? name + "." + key : (String) key, propMap,
+                                    filterProps);
+                        }
                     }
                 } catch (Exception e1) {
                     throw new FalconException(e1);
                 }
-            } catch(Exception e) {
+            } catch (Exception e) {
                 throw new FalconException(e);
             }
         }
     }
 
     public static String getStagingPath(Entity entity) throws FalconException {
-		try {
-			return "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName() + "/"
-			+ md5(entity);
-		} catch (Exception e) {
-			throw new FalconException(e);
-		}
-	}
-
-	public static WorkflowName getWorkflowName(Tag tag, List<String> suffixes,
-			Entity entity) {
-		WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-				entity);
-		builder.setTag(tag);
-		builder.setSuffixes(suffixes);
-		return builder.getWorkflowName();
-	}
-
-	public static WorkflowName getWorkflowName(Tag tag, Entity entity) {
-		return getWorkflowName(tag, null, entity);
-	}
-
-	public static WorkflowName getWorkflowName(Entity entity) {
-		return getWorkflowName(null, null, entity);
-	}
-
-	public static String getWorkflowNameSuffix(String workflowName,
-			Entity entity) throws FalconException {
-		WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-				entity);
-		return builder.getWorkflowSuffixes(workflowName).replaceAll("_", "");
-	}
-
-	public static Tag getWorkflowNameTag(String workflowName, Entity entity) {
-		WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-				entity);
-		return builder.getWorkflowTag(workflowName);
-	}
-
-	public static <T extends Entity> T getClusterView(T entity, String clusterName) {
-	    switch(entity.getEntityType()) {
-	    case CLUSTER:
-	        return entity;
-	        
-	    case FEED:
-	        Feed feed = (Feed) entity.clone();
-	        Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-	        Iterator<Cluster> itr = feed.getClusters().getClusters().iterator();
-	        while(itr.hasNext()) {
-	            Cluster cluster = itr.next();
-	            //In addition to retaining the required clster, retain the sources clusters if this is the target cluster
-	            //1. Retain cluster if cluster n
-	            if(!(cluster.getName().equals(clusterName) || 
-	                    (feedCluster.getType() == ClusterType.TARGET && cluster.getType() == ClusterType.SOURCE)))
-	                itr.remove();
-	        }
-	        return (T) feed;            
-	        
-	    case PROCESS:
-	        Process process = (Process) entity.clone();
-	        Iterator<org.apache.falcon.entity.v0.process.Cluster> procItr = process.getClusters().getClusters().iterator();
-	        while(procItr.hasNext()) {
-	            org.apache.falcon.entity.v0.process.Cluster cluster = procItr.next();
-	            if(!cluster.getName().equals(clusterName))
-	                procItr.remove();
-	        }
-	        return (T) process;
-	    }
-	    throw new UnsupportedOperationException("Not supported for entity type " + entity.getEntityType());
-	}
-	
-	public static Set<String> getClustersDefined(Entity entity) {
-	    Set<String> clusters = new HashSet<String>();
-		switch(entity.getEntityType()) {
-		case CLUSTER:
-		    clusters.add(entity.getName());
-		    break;
-		    
-		case FEED:
-			Feed feed = (Feed) entity;
-			for(Cluster cluster:feed.getClusters().getClusters())
-				clusters.add(cluster.getName());
-			break;
-			
-		case PROCESS:
-			Process process = (Process) entity;
-			for(org.apache.falcon.entity.v0.process.Cluster cluster:process.getClusters().getClusters())
-				clusters.add(cluster.getName());
-			break;
-		}  
-		return clusters;
-	}
-	
-	public static Set<String> getClustersDefinedInColos(Entity entity) {
-		Set<String> entityClusters = EntityUtil.getClustersDefined(entity);
-		if (DeploymentUtil.isEmbeddedMode())
-			return entityClusters;
-
-		Set<String> myClusters = DeploymentUtil.getCurrentClusters();
-		Set<String> applicableClusters = new HashSet<String>();
-		for (String cluster : entityClusters)
-			if (myClusters.contains(cluster))
-				applicableClusters.add(cluster);
-		return applicableClusters;
-	}
-
-	public static Path getStagingPath(
-			org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-					throws FalconException {
-		try {
-			return new Path(ClusterHelper.getLocation(cluster, "staging"),
-					EntityUtil.getStagingPath(entity));
-		} catch (Exception e) {
-			throw new FalconException(e);
-		}
-	}
-
-	public static Retry getRetry(Entity entity) throws FalconException {
-		switch (entity.getEntityType()) {
-		case FEED:
-			if (!RuntimeProperties.get()
-					.getProperty("feed.retry.allowed", "true")
-					.equalsIgnoreCase("true")) {
-				return null;
-			}
-			Retry retry = new Retry();
-			retry.setAttempts(Integer.parseInt(RuntimeProperties.get()
-					.getProperty("feed.retry.attempts", "3")));
-			retry.setDelay(new Frequency(RuntimeProperties.get().getProperty(
-					"feed.retry.frequency", "minutes(5)")));
-			retry.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
-					.getProperty("feed.retry.policy", "exp-backoff")));
-			return retry;
-		case PROCESS:
-			Process process = (Process) entity;
-			return process.getRetry();
-		default:
-			throw new FalconException("Cannot create Retry for entity:"+entity.getName());
-		}
-	}
-
-	public static LateProcess getLateProcess(Entity entity)
-			throws FalconException {
-		switch (entity.getEntityType()) {
-		case FEED:
-			if (!RuntimeProperties.get()
-					.getProperty("feed.late.allowed", "true")
-					.equalsIgnoreCase("true")) {
-				return null;
-			}
-			LateProcess lateProcess = new LateProcess();
-			lateProcess.setDelay(new Frequency(RuntimeProperties.get()
-					.getProperty("feed.late.frequency", "hours(3)")));
-			lateProcess.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
-					.getProperty("feed.late.policy", "exp-backoff")));
-			LateInput lateInput = new LateInput();
-			lateInput.setInput(entity.getName());
-			//TODO - Assuming the late workflow is not used
-			lateInput.setWorkflowPath("ignore.xml");
-			lateProcess.getLateInputs().add(lateInput);
-			return lateProcess;
-		case PROCESS:
-			Process process = (Process) entity;
-			return process.getLateProcess();
-		default:
-			throw new FalconException("Cannot create Late Process for entity:"+entity.getName());
-		}
-	}
-	
-	public static Path getLogPath(
-			org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-			throws FalconException {
-		Path logPath = new Path(ClusterHelper.getLocation(cluster,
-				"staging"), EntityUtil.getStagingPath(entity) + "/../logs");
-		return logPath;
-	}
-	
-	public static String UTCtoURIDate(String utc) throws FalconException {
-		DateFormat utcFormat = new SimpleDateFormat(
-				"yyyy'-'MM'-'dd'T'HH':'mm'Z'");
-		Date utcDate;
-		try {
-			utcDate = utcFormat.parse(utc);
-		} catch (ParseException e) {
-			throw new FalconException("Unable to parse utc date:", e);
-		}
-		DateFormat uriFormat = new SimpleDateFormat("yyyy'-'MM'-'dd'-'HH'-'mm");
-		return uriFormat.format(utcDate);
-	}
+        try {
+            return "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName() + "/"
+                    + md5(entity);
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
+    }
+
+    public static WorkflowName getWorkflowName(Tag tag, List<String> suffixes,
+                                               Entity entity) {
+        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
+                entity);
+        builder.setTag(tag);
+        builder.setSuffixes(suffixes);
+        return builder.getWorkflowName();
+    }
+
+    public static WorkflowName getWorkflowName(Tag tag, Entity entity) {
+        return getWorkflowName(tag, null, entity);
+    }
+
+    public static WorkflowName getWorkflowName(Entity entity) {
+        return getWorkflowName(null, null, entity);
+    }
+
+    public static String getWorkflowNameSuffix(String workflowName,
+                                               Entity entity) throws FalconException {
+        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
+                entity);
+        return builder.getWorkflowSuffixes(workflowName).replaceAll("_", "");
+    }
+
+    public static Tag getWorkflowNameTag(String workflowName, Entity entity) {
+        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
+                entity);
+        return builder.getWorkflowTag(workflowName);
+    }
+
+    public static <T extends Entity> T getClusterView(T entity, String clusterName) {
+        switch (entity.getEntityType()) {
+            case CLUSTER:
+                return entity;
+
+            case FEED:
+                Feed feed = (Feed) entity.clone();
+                Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
+                Iterator<Cluster> itr = feed.getClusters().getClusters().iterator();
+                while (itr.hasNext()) {
+                    Cluster cluster = itr.next();
+                    //In addition to retaining the required clster, retain the sources clusters if this is the target
+                    // cluster
+                    //1. Retain cluster if cluster n
+                    if (!(cluster.getName().equals(clusterName) ||
+                            (feedCluster.getType() == ClusterType.TARGET && cluster.getType() == ClusterType.SOURCE))) {
+                        itr.remove();
+                    }
+                }
+                return (T) feed;
+
+            case PROCESS:
+                Process process = (Process) entity.clone();
+                Iterator<org.apache.falcon.entity.v0.process.Cluster> procItr
+                        = process.getClusters().getClusters().iterator();
+                while (procItr.hasNext()) {
+                    org.apache.falcon.entity.v0.process.Cluster cluster = procItr.next();
+                    if (!cluster.getName().equals(clusterName)) {
+                        procItr.remove();
+                    }
+                }
+                return (T) process;
+        }
+        throw new UnsupportedOperationException("Not supported for entity type " + entity.getEntityType());
+    }
+
+    public static Set<String> getClustersDefined(Entity entity) {
+        Set<String> clusters = new HashSet<String>();
+        switch (entity.getEntityType()) {
+            case CLUSTER:
+                clusters.add(entity.getName());
+                break;
+
+            case FEED:
+                Feed feed = (Feed) entity;
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    clusters.add(cluster.getName());
+                }
+                break;
+
+            case PROCESS:
+                Process process = (Process) entity;
+                for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+                    clusters.add(cluster.getName());
+                }
+                break;
+        }
+        return clusters;
+    }
+
+    public static Set<String> getClustersDefinedInColos(Entity entity) {
+        Set<String> entityClusters = EntityUtil.getClustersDefined(entity);
+        if (DeploymentUtil.isEmbeddedMode()) {
+            return entityClusters;
+        }
+
+        Set<String> myClusters = DeploymentUtil.getCurrentClusters();
+        Set<String> applicableClusters = new HashSet<String>();
+        for (String cluster : entityClusters) {
+            if (myClusters.contains(cluster)) {
+                applicableClusters.add(cluster);
+            }
+        }
+        return applicableClusters;
+    }
+
+    public static Path getStagingPath(
+            org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+            throws FalconException {
+        try {
+            return new Path(ClusterHelper.getLocation(cluster, "staging"),
+                    EntityUtil.getStagingPath(entity));
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
+    }
+
+    public static Retry getRetry(Entity entity) throws FalconException {
+        switch (entity.getEntityType()) {
+            case FEED:
+                if (!RuntimeProperties.get()
+                        .getProperty("feed.retry.allowed", "true")
+                        .equalsIgnoreCase("true")) {
+                    return null;
+                }
+                Retry retry = new Retry();
+                retry.setAttempts(Integer.parseInt(RuntimeProperties.get()
+                        .getProperty("feed.retry.attempts", "3")));
+                retry.setDelay(new Frequency(RuntimeProperties.get().getProperty(
+                        "feed.retry.frequency", "minutes(5)")));
+                retry.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
+                        .getProperty("feed.retry.policy", "exp-backoff")));
+                return retry;
+            case PROCESS:
+                Process process = (Process) entity;
+                return process.getRetry();
+            default:
+                throw new FalconException("Cannot create Retry for entity:" + entity.getName());
+        }
+    }
+
+    public static LateProcess getLateProcess(Entity entity)
+            throws FalconException {
+        switch (entity.getEntityType()) {
+            case FEED:
+                if (!RuntimeProperties.get()
+                        .getProperty("feed.late.allowed", "true")
+                        .equalsIgnoreCase("true")) {
+                    return null;
+                }
+                LateProcess lateProcess = new LateProcess();
+                lateProcess.setDelay(new Frequency(RuntimeProperties.get()
+                        .getProperty("feed.late.frequency", "hours(3)")));
+                lateProcess.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
+                        .getProperty("feed.late.policy", "exp-backoff")));
+                LateInput lateInput = new LateInput();
+                lateInput.setInput(entity.getName());
+                //TODO - Assuming the late workflow is not used
+                lateInput.setWorkflowPath("ignore.xml");
+                lateProcess.getLateInputs().add(lateInput);
+                return lateProcess;
+            case PROCESS:
+                Process process = (Process) entity;
+                return process.getLateProcess();
+            default:
+                throw new FalconException("Cannot create Late Process for entity:" + entity.getName());
+        }
+    }
+
+    public static Path getLogPath(
+            org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+            throws FalconException {
+        Path logPath = new Path(ClusterHelper.getLocation(cluster,
+                "staging"), EntityUtil.getStagingPath(entity) + "/../logs");
+        return logPath;
+    }
+
+    public static String UTCtoURIDate(String utc) throws FalconException {
+        DateFormat utcFormat = new SimpleDateFormat(
+                "yyyy'-'MM'-'dd'T'HH':'mm'Z'");
+        Date utcDate;
+        try {
+            utcDate = utcFormat.parse(utc);
+        } catch (ParseException e) {
+            throw new FalconException("Unable to parse utc date:", e);
+        }
+        DateFormat uriFormat = new SimpleDateFormat("yyyy'-'MM'-'dd'-'HH'-'mm");
+        return uriFormat.format(utcDate);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/ExternalId.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ExternalId.java b/common/src/main/java/org/apache/falcon/entity/ExternalId.java
index 1b9f713..cb865d2 100644
--- a/common/src/main/java/org/apache/falcon/entity/ExternalId.java
+++ b/common/src/main/java/org/apache/falcon/entity/ExternalId.java
@@ -32,42 +32,43 @@ public class ExternalId {
     public ExternalId(String id) {
         this.id = id;
     }
-    
+
     public String getId() {
         return id;
     }
-    
+
     public ExternalId(String name, Tag tag, String elexpr) {
-        if(StringUtils.isEmpty(name) || tag == null || StringUtils.isEmpty(elexpr))
+        if (StringUtils.isEmpty(name) || tag == null || StringUtils.isEmpty(elexpr)) {
             throw new IllegalArgumentException("Empty inputs!");
-        
+        }
+
         id = name + SEPARATOR + tag.name() + SEPARATOR + elexpr;
     }
-    
+
     public ExternalId(String name, Tag tag, Date date) {
         this(name, tag, SchemaHelper.formatDateUTC(date));
     }
-    
+
     public String getName() {
         String[] parts = id.split(SEPARATOR);
         return parts[0];
     }
-    
+
     public Date getDate() throws FalconException {
-        return EntityUtil.parseDateUTC(getDateAsString());            
+        return EntityUtil.parseDateUTC(getDateAsString());
     }
-    
+
     public String getDateAsString() {
         String[] parts = id.split(SEPARATOR);
         return parts[2];
     }
-    
-	public Tag getTag() {
-		String[] parts = id.split(SEPARATOR);
-		return Tag.valueOf(parts[1]);
-	}
 
-	public String getDFSname() {
-		return id.replace(":", "-");
-	}
+    public Tag getTag() {
+        String[] parts = id.split(SEPARATOR);
+        return Tag.valueOf(parts[1]);
+    }
+
+    public String getDFSname() {
+        return id.replace(":", "-");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 2fcb7cc..4a39d8c 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -18,57 +18,54 @@
 
 package org.apache.falcon.entity;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
 public class FeedHelper {
     public static Cluster getCluster(Feed feed, String clusterName) {
-        for(Cluster cluster:feed.getClusters().getClusters())
-            if(cluster.getName().equals(clusterName))
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (cluster.getName().equals(clusterName)) {
                 return cluster;
+            }
+        }
         return null;
     }
-    
-	public static Location getLocation(Feed feed, LocationType type,
-			String clusterName) {
-		Cluster cluster = getCluster(feed, clusterName);
-		if (cluster!=null &&cluster.getLocations() != null 
-				&& cluster.getLocations() .getLocations().size() != 0) {
-			return getLocation(cluster.getLocations() , type);
-		}
-		else{
-			return getLocation(feed.getLocations(), type);
-		}
 
-	}
-	
-	public static Location getLocation(Feed feed, LocationType type) {
-		return getLocation(feed.getLocations(), type);
-	}
+    public static Location getLocation(Feed feed, LocationType type,
+                                       String clusterName) {
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster != null && cluster.getLocations() != null
+                && cluster.getLocations().getLocations().size() != 0) {
+            return getLocation(cluster.getLocations(), type);
+        } else {
+            return getLocation(feed.getLocations(), type);
+        }
+
+    }
+
+    public static Location getLocation(Feed feed, LocationType type) {
+        return getLocation(feed.getLocations(), type);
+    }
+
+    public static Location getLocation(Locations locations, LocationType type) {
+        for (Location loc : locations.getLocations()) {
+            if (loc.getType() == type) {
+                return loc;
+            }
+        }
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
+    }
 
-	public static Location getLocation(Locations locations, LocationType type) {
-		for (Location loc : locations.getLocations()) {
-			if (loc.getType() == type) {
-				return loc;
-			}
-		}
-		Location loc = new Location();
-		loc.setPath("/tmp");
-		loc.setType(type);
-		return loc;
-	}
-    
     public static String normalizePartitionExpression(String part1, String part2) {
         String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
         partExp = partExp.replaceAll("//+", "/");
@@ -79,16 +76,17 @@ public class FeedHelper {
 
     public static String normalizePartitionExpression(String partition) {
         return normalizePartitionExpression(partition, null);
-    }    
-    
+    }
+
     private static Properties loadClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
         Properties properties = new Properties();
         Map<String, String> clusterVars = new HashMap<String, String>();
         clusterVars.put("colo", cluster.getColo());
         clusterVars.put("name", cluster.getName());
         if (cluster.getProperties() != null) {
-            for (Property property : cluster.getProperties().getProperties())
+            for (Property property : cluster.getProperties().getProperties()) {
                 clusterVars.put(property.getName(), property.getValue());
+            }
         }
         properties.put("cluster", clusterVars);
         return properties;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index ca72c21..598f26e 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -23,9 +23,11 @@ import org.apache.falcon.entity.v0.process.Process;
 
 public class ProcessHelper {
     public static Cluster getCluster(Process process, String clusterName) {
-        for(Cluster cluster:process.getClusters().getClusters())
-            if(cluster.getName().equals(clusterName))
+        for (Cluster cluster : process.getClusters().getClusters()) {
+            if (cluster.getName().equals(clusterName)) {
                 return cluster;
+            }
+        }
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index 93e728a..4a636cd 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -17,107 +17,107 @@
  */
 package org.apache.falcon.entity;
 
+import org.apache.falcon.Pair;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.Entity;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.falcon.Pair;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.v0.Entity;
-
 public class WorkflowNameBuilder<T extends Entity> {
-	private static final String PREFIX = "FALCON";
-
-	T entity;
-	private Tag tag;
-	private List<String> suffixes;
-
-	public WorkflowNameBuilder(T entity) {
-		this.entity = entity;
-	}
-
-	public void setTag(Tag tag) {
-		this.tag = tag;
-	}
-
-	public void setSuffixes(List<String> suffixes) {
-		this.suffixes = suffixes;
-	}
-
-	public WorkflowName getWorkflowName() {
-		return new WorkflowName(PREFIX, entity.getEntityType().name(),
-				tag == null ? null : tag.name(), entity.getName(),
-				suffixes == null ? new ArrayList<String>() : suffixes);
-	}
-
-	public Tag getWorkflowTag(String workflowName) {
-		return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? null
-				: WorkflowName.getTagAndSuffixes(entity, workflowName).first;
-	}
-
-	public String getWorkflowSuffixes(String workflowName) {
-		return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? ""
-				: WorkflowName.getTagAndSuffixes(entity, workflowName).second;
-	}
-
-	public static class WorkflowName {
-		private static final String SEPARATOR = "_";
-
-		private String prefix;
-		private String entityType;
-		private String tag;
-		private String entityName;
-		private List<String> suffixes;
-
-		public WorkflowName(String prefix, String entityType, String tag,
-				String entityName, List<String> suffixes) {
-			this.prefix = prefix;
-			this.entityType = entityType;
-			this.tag = tag;
-			this.entityName = entityName;
-			this.suffixes = suffixes;
-		}
-
-		@Override
-		public String toString() {
-			StringBuilder builder = new StringBuilder();
-			builder.append(prefix).append(SEPARATOR).append(entityType)
-					.append(tag == null ? "" : SEPARATOR + tag)
-					.append(SEPARATOR).append(entityName);
-
-			for (String suffix : suffixes) {
-				builder.append(SEPARATOR).append(suffix);
-			}
-
-			return builder.toString();
-		}
-
-		public static Pair<Tag, String> getTagAndSuffixes(Entity entity,
-				String workflowName) {
-
-			StringBuilder namePattern = new StringBuilder(PREFIX + SEPARATOR
-					+ entity.getEntityType().name() + SEPARATOR + "(");
-			for (Tag tag : Tag.values()) {
-				namePattern.append(tag.name());
-				namePattern.append("|");
-			}
-			namePattern = namePattern.deleteCharAt(namePattern.length()-1);
-			namePattern.append(")" + SEPARATOR + entity.getName()
-					+ "([_A-Za-z0-9-.]*)");
-			
-			Pattern pattern = Pattern.compile(namePattern.toString());
-
-			Matcher matcher = pattern.matcher(workflowName);
-			if (matcher.matches()) {
-				matcher.reset();
-				if (matcher.find()) {
-					String tag = matcher.group(1);
-					String suffixes = matcher.group(2);
-					return new Pair<Tag, String>(Tag.valueOf(tag), suffixes);
-				}
-			}
-			return null;
-		}
-	}
+    private static final String PREFIX = "FALCON";
+
+    T entity;
+    private Tag tag;
+    private List<String> suffixes;
+
+    public WorkflowNameBuilder(T entity) {
+        this.entity = entity;
+    }
+
+    public void setTag(Tag tag) {
+        this.tag = tag;
+    }
+
+    public void setSuffixes(List<String> suffixes) {
+        this.suffixes = suffixes;
+    }
+
+    public WorkflowName getWorkflowName() {
+        return new WorkflowName(PREFIX, entity.getEntityType().name(),
+                tag == null ? null : tag.name(), entity.getName(),
+                suffixes == null ? new ArrayList<String>() : suffixes);
+    }
+
+    public Tag getWorkflowTag(String workflowName) {
+        return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? null
+                : WorkflowName.getTagAndSuffixes(entity, workflowName).first;
+    }
+
+    public String getWorkflowSuffixes(String workflowName) {
+        return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? ""
+                : WorkflowName.getTagAndSuffixes(entity, workflowName).second;
+    }
+
+    public static class WorkflowName {
+        private static final String SEPARATOR = "_";
+
+        private String prefix;
+        private String entityType;
+        private String tag;
+        private String entityName;
+        private List<String> suffixes;
+
+        public WorkflowName(String prefix, String entityType, String tag,
+                            String entityName, List<String> suffixes) {
+            this.prefix = prefix;
+            this.entityType = entityType;
+            this.tag = tag;
+            this.entityName = entityName;
+            this.suffixes = suffixes;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append(prefix).append(SEPARATOR).append(entityType)
+                    .append(tag == null ? "" : SEPARATOR + tag)
+                    .append(SEPARATOR).append(entityName);
+
+            for (String suffix : suffixes) {
+                builder.append(SEPARATOR).append(suffix);
+            }
+
+            return builder.toString();
+        }
+
+        public static Pair<Tag, String> getTagAndSuffixes(Entity entity,
+                                                          String workflowName) {
+
+            StringBuilder namePattern = new StringBuilder(PREFIX + SEPARATOR
+                    + entity.getEntityType().name() + SEPARATOR + "(");
+            for (Tag tag : Tag.values()) {
+                namePattern.append(tag.name());
+                namePattern.append("|");
+            }
+            namePattern = namePattern.deleteCharAt(namePattern.length() - 1);
+            namePattern.append(")" + SEPARATOR + entity.getName()
+                    + "([_A-Za-z0-9-.]*)");
+
+            Pattern pattern = Pattern.compile(namePattern.toString());
+
+            Matcher matcher = pattern.matcher(workflowName);
+            if (matcher.matches()) {
+                matcher.reset();
+                if (matcher.find()) {
+                    String tag = matcher.group(1);
+                    String suffixes = matcher.group(2);
+                    return new Pair<Tag, String>(Tag.valueOf(tag), suffixes);
+                }
+            }
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/Configuration.java b/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
index 1f17672..0e7e8f1 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
@@ -25,49 +25,49 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class Configuration implements Iterable<Map.Entry<String, String>>, Cloneable {
 
-  private final Map<String, String> properties;
+    private final Map<String, String> properties;
 
-  public Configuration() {
-    properties = new ConcurrentHashMap<String, String>();
-  }
+    public Configuration() {
+        properties = new ConcurrentHashMap<String, String>();
+    }
 
-  public Configuration(Map<String, String> properties) {
-    this.properties = properties;
-  }
+    public Configuration(Map<String, String> properties) {
+        this.properties = properties;
+    }
 
-  public void addConfiguration(Configuration config) {
-    for (Entry<String, String> entry : config) {
-      properties.put(entry.getKey(), entry.getValue());
+    public void addConfiguration(Configuration config) {
+        for (Entry<String, String> entry : config) {
+            properties.put(entry.getKey(), entry.getValue());
+        }
     }
-  }
 
-  public Configuration addAndReturnNewConfiguration(Configuration config) {
-    Map<String, String> newProperties = new ConcurrentHashMap<String, String>(properties);
-    for (Entry<String, String> entry : config) {
-      newProperties.put(entry.getKey(), entry.getValue());
+    public Configuration addAndReturnNewConfiguration(Configuration config) {
+        Map<String, String> newProperties = new ConcurrentHashMap<String, String>(properties);
+        for (Entry<String, String> entry : config) {
+            newProperties.put(entry.getKey(), entry.getValue());
+        }
+        return new Configuration(newProperties);
     }
-    return new Configuration(newProperties);
-  }
 
-  public String getConf(String name) {
-    return properties.get(name);
-  }
+    public String getConf(String name) {
+        return properties.get(name);
+    }
 
-  public void setConf(String name, String value) {
-    properties.put(name, value);
-  }
+    public void setConf(String name, String value) {
+        properties.put(name, value);
+    }
 
-  public void setConf(String name, String value, String defaultValue) {
-    if (value == null) {
-      properties.put(name, defaultValue);
-    } else {
-      properties.put(name, value);
+    public void setConf(String name, String value, String defaultValue) {
+        if (value == null) {
+            properties.put(name, defaultValue);
+        } else {
+            properties.put(name, value);
+        }
     }
-  }
 
-  @Override
-  public Iterator<Entry<String, String>> iterator() {
-    return properties.entrySet().iterator();
-  }
+    @Override
+    public Iterator<Entry<String, String>> iterator() {
+        return properties.entrySet().iterator();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 1d77b69..470c98a 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -21,36 +21,36 @@ import java.util.regex.Pattern;
 
 public class FeedDataPath {
 
-	public static enum VARS {
-		YEAR("yyyy"), MONTH("MM"), DAY("dd"), HOUR("HH"), MINUTE("mm");
-
-		private final Pattern pattern;
-		private final String datePattern;
-
-		private VARS(String datePattern) {
-			pattern = Pattern.compile("\\$\\{" + name() + "\\}");
-			this.datePattern = datePattern;
-		}
-
-		public String regex() {
-			return pattern.pattern();
-		}
-
-		public static VARS from(String str) {
-			for (VARS var : VARS.values()) {
-				if (var.datePattern.equals(str)) {
-					return var;
-				}
-			}
-			return null;
-		}
-	}
-
-	public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
-			+ "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|"
-			+ VARS.HOUR.regex() + "|" + VARS.MINUTE.regex());
-
-	public static final Pattern DATE_FIELD_PATTERN = Pattern
-			.compile("yyyy|MM|dd|HH|mm");
+    public static enum VARS {
+        YEAR("yyyy"), MONTH("MM"), DAY("dd"), HOUR("HH"), MINUTE("mm");
+
+        private final Pattern pattern;
+        private final String datePattern;
+
+        private VARS(String datePattern) {
+            pattern = Pattern.compile("\\$\\{" + name() + "\\}");
+            this.datePattern = datePattern;
+        }
+
+        public String regex() {
+            return pattern.pattern();
+        }
+
+        public static VARS from(String str) {
+            for (VARS var : VARS.values()) {
+                if (var.datePattern.equals(str)) {
+                    return var;
+                }
+            }
+            return null;
+        }
+    }
+
+    public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
+            + "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|"
+            + VARS.HOUR.regex() + "|" + VARS.MINUTE.regex());
+
+    public static final Pattern DATE_FIELD_PATTERN = Pattern
+            .compile("yyyy|MM|dd|HH|mm");
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 7fd92b6..c6ef988 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -18,13 +18,13 @@
 
 package org.apache.falcon.entity.parser;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 public class ClusterEntityParser extends EntityParser<Cluster> {
@@ -36,22 +36,22 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     }
 
     @Override
-	public void validate(Cluster cluster) throws StoreAccessException,
-			ValidationException { 
-		if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme()==null) {
-			throw new ValidationException(
-					"Cannot get valid scheme for namenode from write interface of cluster: "
-							+ cluster.getName());
-		}
-		try {
-			Configuration conf = new Configuration();
-			conf.set("fs.default.name", ClusterHelper.getStorageUrl(cluster));
-			conf.setInt("ipc.client.connect.max.retries", 10);
-			FileSystem.get(conf);
-		} catch (Exception e) {
-			throw new ValidationException("Invalid HDFS server or port:"
-					+ ClusterHelper.getStorageUrl(cluster), e);
-		}
-	}
+    public void validate(Cluster cluster) throws StoreAccessException,
+                                                 ValidationException {
+        if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
+            throw new ValidationException(
+                    "Cannot get valid scheme for namenode from write interface of cluster: "
+                            + cluster.getName());
+        }
+        try {
+            Configuration conf = new Configuration();
+            conf.set("fs.default.name", ClusterHelper.getStorageUrl(cluster));
+            conf.setInt("ipc.client.connect.max.retries", 10);
+            FileSystem.get(conf);
+        } catch (Exception e) {
+            throw new ValidationException("Invalid HDFS server or port:"
+                    + ClusterHelper.getStorageUrl(cluster), e);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java b/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
index 693e075..09aed24 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
@@ -18,19 +18,16 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.util.Date;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.util.Date;
+
 public final class CrossEntityValidations {
 
     public static void validateInstanceRange(Process process, Input input, Feed feed) throws FalconException {
@@ -38,7 +35,8 @@ public final class CrossEntityValidations {
         try {
             for (Cluster cluster : process.getClusters().getClusters()) {
                 String clusterName = cluster.getName();
-                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed, clusterName).getValidity();
+                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
+                        clusterName).getValidity();
                 Date feedStart = feedValidity.getStart();
                 Date feedEnd = feedValidity.getEnd();
 
@@ -49,22 +47,28 @@ public final class CrossEntityValidations {
                 Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
                 ExpressionHelper.setReferenceDate(processValidity.getStart());
                 Date instStart = evaluator.evaluate(instStartEL, Date.class);
-                if (instStart.before(feedStart))
+                if (instStart.before(feedStart)) {
                     throw new ValidationException("Start instance  " + instStartEL + " of feed " + feed.getName()
-                            + " is before the start of feed " + feedValidity.getStart() + " for cluster " + clusterName);
+                            + " is before the start of feed " + feedValidity.getStart() + " for cluster "
+                            + clusterName);
+                }
 
                 Date instEnd = evaluator.evaluate(instEndEL, Date.class);
-                if (instEnd.after(feedEnd))
+                if (instEnd.after(feedEnd)) {
                     throw new ValidationException("End instance  " + instEndEL + " of feed " + feed.getName()
-                            + " is before the start of feed " + feedValidity.getStart() + " for cluster " + clusterName);
+                            + " is before the start of feed " + feedValidity.getStart() + " for cluster "
+                            + clusterName);
+                }
 
-                if (instEnd.before(instStart))
+                if (instEnd.before(instStart)) {
                     throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
                             + " is before the start instance " + instStartEL + " for cluster " + clusterName);
+                }
 
-                if (instEnd.after(feedEnd))
+                if (instEnd.after(feedEnd)) {
                     throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
                             + " is after the end of feed " + feedValidity.getEnd() + " for cluster " + clusterName);
+                }
             }
         } catch (ValidationException e) {
             throw e;
@@ -73,7 +77,8 @@ public final class CrossEntityValidations {
         }
     }
 
-    public static void validateFeedRetentionPeriod(String startInstance, Feed feed, String clusterName) throws FalconException {
+    public static void validateFeedRetentionPeriod(String startInstance, Feed feed, String clusterName)
+            throws FalconException {
         String feedRetention = FeedHelper.getCluster(feed, clusterName).getRetention().getLimit().toString();
         ExpressionHelper evaluator = ExpressionHelper.get();
 
@@ -95,7 +100,8 @@ public final class CrossEntityValidations {
         try {
             for (Cluster cluster : process.getClusters().getClusters()) {
                 String clusterName = cluster.getName();
-                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed, clusterName).getValidity();
+                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
+                        clusterName).getValidity();
                 Date feedStart = feedValidity.getStart();
                 Date feedEnd = feedValidity.getEnd();
 
@@ -104,13 +110,15 @@ public final class CrossEntityValidations {
                 Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
                 ExpressionHelper.setReferenceDate(processValidity.getStart());
                 Date inst = evaluator.evaluate(instEL, Date.class);
-                if (inst.before(feedStart))
+                if (inst.before(feedStart)) {
                     throw new ValidationException("Instance  " + instEL + " of feed " + feed.getName()
                             + " is before the start of feed " + feedValidity.getStart() + " for cluster" + clusterName);
+                }
 
-                if (inst.after(feedEnd))
+                if (inst.after(feedEnd)) {
                     throw new ValidationException("End instance " + instEL + " for feed " + feed.getName()
                             + " is after the end of feed " + feedValidity.getEnd() + " for cluster" + clusterName);
+                }
             }
         } catch (ValidationException e) {
             throw e;
@@ -122,12 +130,14 @@ public final class CrossEntityValidations {
     public static void validateInputPartition(Input input, Feed feed) throws ValidationException {
         String[] parts = input.getPartition().split("/");
         if (feed.getPartitions() == null || feed.getPartitions().getPartitions().isEmpty()
-                || feed.getPartitions().getPartitions().size() < parts.length)
+                || feed.getPartitions().getPartitions().size() < parts.length) {
             throw new ValidationException("Partition specification in input " + input.getName() + " is wrong");
+        }
     }
 
     public static void validateFeedDefinedForCluster(Feed feed, String clusterName) throws FalconException {
-        if (FeedHelper.getCluster(feed, clusterName) == null)
+        if (FeedHelper.getCluster(feed, clusterName) == null) {
             throw new ValidationException("Feed " + feed.getName() + " is not defined for cluster " + clusterName);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 52d086c..6b06cc4 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -18,12 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.List;
-
-import javax.xml.bind.Unmarshaller;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -31,11 +25,15 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.log4j.Logger;
 
+import javax.xml.bind.Unmarshaller;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
 /**
- * 
  * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER
  * Should extend this parser to implement specific parsing.
- * 
+ *
  * @param <T>
  */
 public abstract class EntityParser<T extends Entity> {
@@ -46,9 +44,8 @@ public abstract class EntityParser<T extends Entity> {
 
     /**
      * Constructor
-     * 
-     * @param entityType
-     *            - can be FEED or PROCESS
+     *
+     * @param entityType - can be FEED or PROCESS
      */
     protected EntityParser(EntityType entityType) {
         this.entityType = entityType;
@@ -60,9 +57,8 @@ public abstract class EntityParser<T extends Entity> {
 
     /**
      * Parses a sent XML and validates it using JAXB.
-     * 
-     * @param xmlString
-     *            - Entity XML
+     *
+     * @param xmlString - Entity XML
      * @return Entity - JAVA Object
      * @throws FalconException
      */
@@ -71,10 +67,10 @@ public abstract class EntityParser<T extends Entity> {
         Entity entity = parseAndValidate(inputStream);
         return entity;
     }
-    
+
     /**
      * Parses xml stream
-     * 
+     *
      * @param xmlStream
      * @return entity
      * @throws FalconException
@@ -86,7 +82,7 @@ public abstract class EntityParser<T extends Entity> {
             T entity = (T) unmarshaller.unmarshal(xmlStream);
             LOG.info("Parsed Entity: " + entity.getName());
             return entity;
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new FalconException(e);
         }
     }
@@ -96,19 +92,20 @@ public abstract class EntityParser<T extends Entity> {
         validate(entity);
         return entity;
     }
-    
+
     protected void validateEntityExists(EntityType type, String name) throws FalconException {
-        if(ConfigurationStore.get().get(type, name) == null)
-            throw new ValidationException("Referenced " + type + " " + name + " is not registered");        
+        if (ConfigurationStore.get().get(type, name) == null) {
+            throw new ValidationException("Referenced " + type + " " + name + " is not registered");
+        }
     }
-    
+
     protected void validateEntitiesExist(List<Pair<EntityType, String>> entities) throws FalconException {
-        if(entities != null) {
-            for(Pair<EntityType, String> entity:entities) {
+        if (entities != null) {
+            for (Pair<EntityType, String> entity : entities) {
                 validateEntityExists(entity.first, entity.second);
             }
         }
     }
-    
+
     public abstract void validate(T entity) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
index e9369e2..43542c3 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
@@ -21,35 +21,33 @@ package org.apache.falcon.entity.parser;
 import org.apache.falcon.entity.v0.EntityType;
 
 /**
- * 
  * Factory Class which returns the Parser based on the EntityType.
- * 
  */
 public final class EntityParserFactory {
 
-	private EntityParserFactory() {
-	}
+    private EntityParserFactory() {
+    }
 
-	/**
-	 * Tie EnityType with the Entity Class in one place so that it can be
-	 * unmarshalled easily by concrete classes based on the class type using
-	 * JAXB.
-	 * 
-	 * @param entityType
-	 * @return concrete parser based on entity type
-	 */
-	public static EntityParser getParser(final EntityType entityType) {
+    /**
+     * Tie EnityType with the Entity Class in one place so that it can be
+     * unmarshalled easily by concrete classes based on the class type using
+     * JAXB.
+     *
+     * @param entityType
+     * @return concrete parser based on entity type
+     */
+    public static EntityParser getParser(final EntityType entityType) {
 
-		switch (entityType) {
-		case PROCESS:
-			return new ProcessEntityParser();
-		case FEED:
-			return new FeedEntityParser();
-		case CLUSTER:
-			return new ClusterEntityParser();
-		default:
-			throw new IllegalArgumentException("Unhandled entity type: " + entityType);
-		}
-	}
+        switch (entityType) {
+            case PROCESS:
+                return new ProcessEntityParser();
+            case FEED:
+                return new FeedEntityParser();
+            case CLUSTER:
+                return new ClusterEntityParser();
+            default:
+                throw new IllegalArgumentException("Unhandled entity type: " + entityType);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 52a0804..475384e 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -18,11 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.TimeZone;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
@@ -43,6 +38,11 @@ import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.log4j.Logger;
 
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimeZone;
+
 public class FeedEntityParser extends EntityParser<Feed> {
 
     private static final Logger LOG = Logger.getLogger(FeedEntityParser.class);
@@ -53,15 +53,18 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
     @Override
     public void validate(Feed feed) throws FalconException {
-        if(feed.getTimezone() == null)
+        if (feed.getTimezone() == null) {
             feed.setTimezone(TimeZone.getTimeZone("UTC"));
-        
-        if (feed.getClusters() == null)
+        }
+
+        if (feed.getClusters() == null) {
             throw new ValidationException("Feed should have atleast one cluster");
+        }
 
         for (Cluster cluster : feed.getClusters().getClusters()) {
             validateEntityExists(EntityType.CLUSTER, cluster.getName());
-            validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(), cluster.getName());
+            validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
+                    cluster.getName());
             validateFeedCutOffPeriod(feed, cluster);
         }
 
@@ -72,8 +75,9 @@ public class FeedEntityParser extends EntityParser<Feed> {
         // But is this an update ?
 
         Feed oldFeed = ConfigurationStore.get().get(EntityType.FEED, feed.getName());
-        if (oldFeed == null)
+        if (oldFeed == null) {
             return; // Not an update case
+        }
 
         // Is actually an update. Need to iterate over all the processes
         // depending on this feed and see if they are valid with the new
@@ -81,8 +85,9 @@ public class FeedEntityParser extends EntityParser<Feed> {
         EntityGraph graph = EntityGraph.get();
         Set<Entity> referenced = graph.getDependents(oldFeed);
         Set<Process> processes = findProcesses(referenced);
-        if (processes.isEmpty())
+        if (processes.isEmpty()) {
             return;
+        }
 
         ensureValidityFor(feed, processes);
     }
@@ -98,33 +103,35 @@ public class FeedEntityParser extends EntityParser<Feed> {
     }
 
     private void validateFeedGroups(Feed feed) throws ValidationException {
-        String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[] {};
+        String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
         String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
-		.getPath();
-		for (Cluster cluster : feed.getClusters().getClusters()) {
-			if (!FeedGroup.getDatePattern(
-					FeedHelper.getLocation(feed, LocationType.DATA,
-							cluster.getName()).getPath()).equals(
-					FeedGroup.getDatePattern(defaultPath))) {
-				throw new ValidationException("Feeds default path pattern: "
-						+ FeedHelper.getLocation(feed, LocationType.DATA)
-								.getPath()
-						+ ", does not match with cluster: "
-						+ cluster.getName()
-						+ " path pattern: "
-						+ FeedHelper.getLocation(feed, LocationType.DATA,
-								cluster.getName()).getPath());
-			}
-		}
+                .getPath();
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (!FeedGroup.getDatePattern(
+                    FeedHelper.getLocation(feed, LocationType.DATA,
+                            cluster.getName()).getPath()).equals(
+                    FeedGroup.getDatePattern(defaultPath))) {
+                throw new ValidationException("Feeds default path pattern: "
+                        + FeedHelper.getLocation(feed, LocationType.DATA)
+                        .getPath()
+                        + ", does not match with cluster: "
+                        + cluster.getName()
+                        + " path pattern: "
+                        + FeedHelper.getLocation(feed, LocationType.DATA,
+                        cluster.getName()).getPath());
+            }
+        }
         for (String groupName : groupNames) {
             FeedGroup group = FeedGroupMap.get().getGroupsMapping().get(groupName);
             if (group == null || group.canContainFeed(feed)) {
                 continue;
             } else {
-                throw new ValidationException("Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                        + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
-                        + " does not match with group: " + group.getName() + "'s frequency: " + group.getFrequency()
-                        + ", date pattern: " + group.getDatePattern());
+                throw new ValidationException(
+                        "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
+                                + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+                                + " does not match with group: " + group.getName() + "'s frequency: "
+                                + group.getFrequency()
+                                + ", date pattern: " + group.getDatePattern());
             }
         }
     }
@@ -134,8 +141,9 @@ public class FeedEntityParser extends EntityParser<Feed> {
             try {
                 ensureValidityFor(newFeed, process);
             } catch (FalconException e) {
-                throw new ValidationException("Process " + process.getName() + " is not compatible " + "with changes to feed "
-                        + newFeed.getName(), e);
+                throw new ValidationException(
+                        "Process " + process.getName() + " is not compatible " + "with changes to feed "
+                                + newFeed.getName(), e);
             }
         }
     }
@@ -145,8 +153,9 @@ public class FeedEntityParser extends EntityParser<Feed> {
             String clusterName = cluster.getName();
             if (process.getInputs() != null) {
                 for (Input input : process.getInputs().getInputs()) {
-                    if (!input.getFeed().equals(newFeed.getName()))
+                    if (!input.getFeed().equals(newFeed.getName())) {
                         continue;
+                    }
                     CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
                     CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
                     CrossEntityValidations.validateInstanceRange(process, input, newFeed);
@@ -159,13 +168,15 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
             if (process.getOutputs() != null) {
                 for (Output output : process.getOutputs().getOutputs()) {
-                    if (!output.getFeed().equals(newFeed.getName()))
+                    if (!output.getFeed().equals(newFeed.getName())) {
                         continue;
+                    }
                     CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
                     CrossEntityValidations.validateInstance(process, output, newFeed);
                 }
             }
-            LOG.debug("Verified and found " + process.getName() + " to be valid for new definition of " + newFeed.getName());
+            LOG.debug("Verified and found " + process.getName() + " to be valid for new definition of "
+                    + newFeed.getName());
         }
     }
 
@@ -188,59 +199,66 @@ public class FeedEntityParser extends EntityParser<Feed> {
         String feedRetention = cluster.getRetention().getLimit().toString();
         long retentionPeriod = evaluator.evaluate(feedRetention, Long.class);
 
-        if(feed.getLateArrival()==null){
-        	LOG.debug("Feed's late arrival cut-off not set");
-        	return;
+        if (feed.getLateArrival() == null) {
+            LOG.debug("Feed's late arrival cut-off not set");
+            return;
         }
         String feedCutoff = feed.getLateArrival().getCutOff().toString();
         long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
 
         if (retentionPeriod < feedCutOffPeriod) {
-            throw new ValidationException("Feed's retention limit: " + feedRetention + " of referenced cluster " + cluster.getName()
-                    + " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: " + feed.getName());
+            throw new ValidationException(
+                    "Feed's retention limit: " + feedRetention + " of referenced cluster " + cluster.getName()
+                            + " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: "
+                            + feed.getName());
         }
     }
-    
+
     private void validateFeedPartitionExpression(Feed feed) throws FalconException {
         int numSourceClusters = 0, numTrgClusters = 0;
         Set<String> clusters = new HashSet<String>();
         for (Cluster cl : feed.getClusters().getClusters()) {
-			if (!clusters.add(cl.getName())) {
-				throw new ValidationException("Cluster: " + cl.getName()
-						+ " is defined more than once for feed: "+feed.getName());
-			}
-            if (cl.getType() == ClusterType.SOURCE){
+            if (!clusters.add(cl.getName())) {
+                throw new ValidationException("Cluster: " + cl.getName()
+                        + " is defined more than once for feed: " + feed.getName());
+            }
+            if (cl.getType() == ClusterType.SOURCE) {
                 numSourceClusters++;
-            } else if(cl.getType() == ClusterType.TARGET) {
+            } else if (cl.getType() == ClusterType.TARGET) {
                 numTrgClusters++;
             }
         }
-        
-		if (numTrgClusters >= 1 && numSourceClusters == 0) {
-			throw new ValidationException("Feed: " + feed.getName()
-					+ " should have atleast one source cluster defined");
-		}
-        
+
+        if (numTrgClusters >= 1 && numSourceClusters == 0) {
+            throw new ValidationException("Feed: " + feed.getName()
+                    + " should have atleast one source cluster defined");
+        }
+
         int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0;
-        
-        for(Cluster cluster:feed.getClusters().getClusters()) {
 
-            if(cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) {
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+
+            if (cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) {
                 String part = FeedHelper.normalizePartitionExpression(cluster.getPartition());
-                if(StringUtils.split(part, '/').length == 0)
-                    throw new ValidationException("Partition expression has to be specified for cluster " + cluster.getName() +
-                            " as there are more than one source clusters");
+                if (StringUtils.split(part, '/').length == 0) {
+                    throw new ValidationException(
+                            "Partition expression has to be specified for cluster " + cluster.getName() +
+                                    " as there are more than one source clusters");
+                }
                 validateClusterExpDefined(cluster);
 
-            } else if(cluster.getType() == ClusterType.TARGET) {
+            } else if (cluster.getType() == ClusterType.TARGET) {
 
-                for(Cluster src:feed.getClusters().getClusters()) {
-                    if(src.getType() == ClusterType.SOURCE) {
-                        String part = FeedHelper.normalizePartitionExpression(src.getPartition(), cluster.getPartition());
+                for (Cluster src : feed.getClusters().getClusters()) {
+                    if (src.getType() == ClusterType.SOURCE) {
+                        String part = FeedHelper.normalizePartitionExpression(src.getPartition(),
+                                cluster.getPartition());
                         int numParts = StringUtils.split(part, '/').length;
-                        if(numParts > feedParts)
-                            throw new ValidationException("Partition for " + src.getName() + " and " + cluster.getName() + 
-                                    "clusters is more than the number of partitions defined in feed");
+                        if (numParts > feedParts) {
+                            throw new ValidationException(
+                                    "Partition for " + src.getName() + " and " + cluster.getName() +
+                                            "clusters is more than the number of partitions defined in feed");
+                        }
                     }
                 }
 
@@ -252,12 +270,15 @@ public class FeedEntityParser extends EntityParser<Feed> {
     }
 
     private void validateClusterExpDefined(Cluster cl) throws FalconException {
-        if(cl.getPartition() == null)
+        if (cl.getPartition() == null) {
             return;
-        
+        }
+
         org.apache.falcon.entity.v0.cluster.Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, cl.getName());
         String part = FeedHelper.normalizePartitionExpression(cl.getPartition());
-        if(FeedHelper.evaluateClusterExp(cluster, part).equals(part))
-            throw new ValidationException("Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName()); 
+        if (FeedHelper.evaluateClusterExp(cluster, part).equals(part)) {
+            throw new ValidationException(
+                    "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
+        }
     }
 }


Mime
View raw message