Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 795C6F68C for ; Fri, 26 Apr 2013 15:50:51 +0000 (UTC) Received: (qmail 96488 invoked by uid 500); 26 Apr 2013 15:50:51 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 96440 invoked by uid 500); 26 Apr 2013 15:50:51 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 96367 invoked by uid 99); 26 Apr 2013 15:50:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 15:50:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FRT_PROFILE2 X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 26 Apr 2013 15:50:39 +0000 Received: (qmail 93663 invoked by uid 99); 26 Apr 2013 15:50:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 15:50:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4CE66882064; Fri, 26 Apr 2013 15:50:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sriksun@apache.org To: commits@falcon.incubator.apache.org Date: Fri, 26 Apr 2013 15:50:33 -0000 Message-Id: <45314e86324e469d9913de6d83794eaf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/47] Fixes for Checkstyle X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index d8751a2..ba618ef 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -18,35 +18,23 @@ package org.apache.falcon.entity.parser; -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.LateInput; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; +import org.apache.falcon.entity.v0.process.*; import org.apache.falcon.entity.v0.process.Process; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.net.ConnectException; +import java.util.*; /** * Concrete Parser which has XML parsing and validation logic for Process XML. - * */ public class ProcessEntityParser extends EntityParser { @@ -56,17 +44,18 @@ public class ProcessEntityParser extends EntityParser { @Override public void validate(Process process) throws FalconException { - if(process.getTimezone() == null) + if (process.getTimezone() == null) { process.setTimezone(TimeZone.getTimeZone("UTC")); + } // check if dependent entities exists Set clusters = new HashSet(); for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { String clusterName = cluster.getName(); - if (!clusters.add(cluster.getName())) { - throw new ValidationException("Cluster: " + cluster.getName() - + " is defined more than once for process: "+process.getName()); - } + if (!clusters.add(cluster.getName())) { + throw new ValidationException("Cluster: " + cluster.getName() + + " is defined more than once for process: " + process.getName()); + } validateEntityExists(EntityType.CLUSTER, clusterName); validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd()); validateHDFSpaths(process, clusterName); @@ -98,25 +87,28 @@ public class ProcessEntityParser extends EntityParser { } private void validateHDFSpaths(Process process, String clusterName) throws FalconException { - org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName); + org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, + clusterName); String workflowPath = process.getWorkflow().getPath(); - String libPath=process.getWorkflow().getLib(); + String libPath = process.getWorkflow().getLib(); String nameNode = getNameNode(cluster, clusterName); try { Configuration configuration = new Configuration(); configuration.set("fs.default.name", nameNode); FileSystem fs = FileSystem.get(configuration); if (!fs.exists(new Path(workflowPath))) { - throw new ValidationException("Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); + throw new ValidationException( + "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); } - - if (libPath!=null && !fs.exists(new Path(libPath))) { + + if (libPath != null && !fs.exists(new Path(libPath))) { throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode); } } catch (ValidationException e) { throw new ValidationException(e); } catch (ConnectException e) { - throw new ValidationException("Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName); + throw new ValidationException( + "Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName); } catch (Exception e) { throw new FalconException(e); } @@ -125,8 +117,9 @@ public class ProcessEntityParser extends EntityParser { private String getNameNode(Cluster cluster, String clusterName) throws ValidationException { // cluster should never be null as it is validated while submitting // feeds. - if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme()==null) { - throw new ValidationException("Cannot get valid nameNode scheme from write interface of cluster: " + clusterName); + if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) { + throw new ValidationException( + "Cannot get valid nameNode scheme from write interface of cluster: " + clusterName); } return ClusterHelper.getStorageUrl(cluster); } @@ -134,7 +127,8 @@ public class ProcessEntityParser extends EntityParser { private void validateProcessValidity(Date start, Date end) throws FalconException { try { if (!start.before(end)) { - throw new ValidationException("Process start time: " + start + " should be before process end time: " + end); + throw new ValidationException( + "Process start time: " + start + " should be before process end time: " + end); } } catch (ValidationException e) { throw new ValidationException(e); @@ -145,15 +139,15 @@ public class ProcessEntityParser extends EntityParser { private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException { Set datasetNames = new HashSet(); - if(inputs != null) { + if (inputs != null) { for (Input input : inputs.getInputs()) { if (!datasetNames.add(input.getName())) { throw new ValidationException("Input name: " + input.getName() + " is already used"); } } } - - if(outputs != null) { + + if (outputs != null) { for (Output output : outputs.getOutputs()) { if (!datasetNames.add(output.getName())) { throw new ValidationException("Output name: " + output.getName() + " is already used"); @@ -163,27 +157,28 @@ public class ProcessEntityParser extends EntityParser { } private void validateLateInputs(Process process) throws ValidationException { - Map feeds = new HashMap(); - if(process.getInputs() != null) { + Map feeds = new HashMap(); + if (process.getInputs() != null) { for (Input in : process.getInputs().getInputs()) { - feeds.put(in.getName(),in.getFeed()); + feeds.put(in.getName(), in.getFeed()); } } if (process.getLateProcess() != null) { - for (LateInput lp : process.getLateProcess().getLateInputs()) { - if (!feeds.keySet().contains(lp.getInput())){ - throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs"); - } - try { - Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput())); - if(feed.getLateArrival()==null){ - throw new ValidationException("Late Input feed: "+lp.getInput()+" is not configured with late arrival cut-off" ); - } - } catch (FalconException e) { - throw new ValidationException(e); - } - } + for (LateInput lp : process.getLateProcess().getLateInputs()) { + if (!feeds.keySet().contains(lp.getInput())) { + throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs"); + } + try { + Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput())); + if (feed.getLateArrival() == null) { + throw new ValidationException( + "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off"); + } + } catch (FalconException e) { + throw new ValidationException(e); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java index fc9ccdf..e01a378 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java @@ -21,22 +21,20 @@ package org.apache.falcon.entity.parser; import org.apache.falcon.FalconException; /** - * * ValidationException during parsing - * */ public class ValidationException extends FalconException { - public ValidationException(String message) { - super(message); - } + public ValidationException(String message) { + super(message); + } - public ValidationException(Exception e) { - super(e); + public ValidationException(Exception e) { + super(e); } - public ValidationException(String message, Exception e) { - super(message, e); + public ValidationException(String message, Exception e) { + super(message, e); } private static final long serialVersionUID = -4502166408759507355L; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index c882453..8fd3775 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -18,25 +18,6 @@ package org.apache.falcon.entity.store; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import javax.xml.bind.JAXBException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -44,8 +25,21 @@ import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + public class ConfigurationStore implements FalconService { private static final Logger LOG = Logger.getLogger(ConfigurationStore.class); @@ -62,7 +56,8 @@ public class ConfigurationStore implements FalconService { return store; } - private final Map> dictionary = new HashMap>(); + private final Map> dictionary + = new HashMap>(); private final FileSystem fs; private final Path storePath; @@ -98,13 +93,15 @@ public class ConfigurationStore implements FalconService { public void init() throws FalconException { String listenerClassNames = StartupProperties.get(). getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph"); - for(String listenerClassName:listenerClassNames.split(",")) { + for (String listenerClassName : listenerClassNames.split(",")) { listenerClassName = listenerClassName.trim(); - if (listenerClassName.isEmpty()) continue; + if (listenerClassName.isEmpty()) { + continue; + } ConfigurationChangeListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); registerListener(listener); } - + try { for (EntityType type : EntityType.values()) { ConcurrentHashMap entityMap = dictionary.get(type); @@ -113,7 +110,7 @@ public class ConfigurationStore implements FalconService { for (FileStatus file : files) { String fileName = file.getPath().getName(); String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop - // ".xml" + // ".xml" String entityName = URLDecoder.decode(encodedEntityName, UTF_8); Entity entity = restore(type, entityName); entityMap.put(entityName, entity); @@ -131,11 +128,8 @@ public class ConfigurationStore implements FalconService { } /** - * - * @param type - * - EntityType that need to be published - * @param entity - * - Reference to the Entity Object + * @param type - EntityType that need to be published + * @param entity - Reference to the Entity Object * @throws FalconException */ public synchronized void publish(EntityType type, Entity entity) throws FalconException { @@ -145,8 +139,9 @@ public class ConfigurationStore implements FalconService { dictionary.get(type).put(entity.getName(), entity); onAdd(entity); } else { - throw new EntityAlreadyExistsException(entity.toShortString() + " already registered with configuration store. " - + "Can't be submitted again. Try removing before submitting."); + throw new EntityAlreadyExistsException( + entity.toShortString() + " already registered with configuration store. " + + "Can't be submitted again. Try removing before submitting."); } } catch (IOException e) { throw new StoreAccessException(e); @@ -193,25 +188,22 @@ public class ConfigurationStore implements FalconService { private void onChange(Entity oldEntity, Entity newEntity) throws FalconException { for (ConfigurationChangeListener listener : listeners) { - listener.onChange(oldEntity, newEntity); + listener.onChange(oldEntity, newEntity); } } public synchronized void initiateUpdate(Entity entity) throws FalconException { if (get(entity.getEntityType(), entity.getName()) == null || updatesInProgress.get() != null) { - throw new FalconException("An update for " + entity.toShortString() + " is already in progress or doesn't exist"); + throw new FalconException( + "An update for " + entity.toShortString() + " is already in progress or doesn't exist"); } updatesInProgress.set(entity); } /** - * - * @param type - * - Entity type that is being retrieved - * @param name - * - Name as it appears in the entity xml definition - * @param - * - Actual Entity object type + * @param type - Entity type that is being retrieved + * @param name - Name as it appears in the entity xml definition + * @param - Actual Entity object type * @return - Entity object from internal dictionary, If the object is not * loaded in memory yet, it will retrieve it from persistent store * just in time. On startup all the entities will be added to the @@ -249,11 +241,9 @@ public class ConfigurationStore implements FalconService { /** * Remove an entity which is already stored in the config store - * - * @param type - * - Entity type being removed - * @param name - * - Name of the entity object being removed + * + * @param type - Entity type being removed + * @param name - Name of the entity object being removed * @return - True is remove is successful, false if request entity doesn't * exist * @throws FalconException @@ -279,19 +269,17 @@ public class ConfigurationStore implements FalconService { listener.onRemove(entity); } catch (Throwable e) { LOG.warn( - "Encountered exception while notifying " + listener + "(" + entity.getEntityType() + ") " + entity.getName(), + "Encountered exception while notifying " + listener + "(" + entity.getEntityType() + ") " + + entity.getName(), e); } } } /** - * - * @param type - * - Entity type that needs to be searched - * @param keywords - * - List of keywords to search for. only entities that have all - * the keywords being searched would be returned + * @param type - Entity type that needs to be searched + * @param keywords - List of keywords to search for. only entities that have all + * the keywords being searched would be returned * @return - Array of entity types */ public Entity[] search(EntityType type, String... keywords) { @@ -299,20 +287,17 @@ public class ConfigurationStore implements FalconService { } /** - * - * @param type - * - Entity type that is to be stored into persistent storage - * @param entity - * - entity to persist. JAXB Annotated entity will be marshalled - * to the persistent store. The convention used for storing the - * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml - * @throws java.io.IOException - * If any error in accessing the storage + * @param type - Entity type that is to be stored into persistent storage + * @param entity - entity to persist. JAXB Annotated entity will be marshalled + * to the persistent store. The convention used for storing the + * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml + * @throws java.io.IOException If any error in accessing the storage * @throws FalconException */ private void persist(EntityType type, Entity entity) throws IOException, FalconException { OutputStream out = fs - .create(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml")); + .create(new Path(storePath, + type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml")); try { type.getMarshaller().marshal(entity, out); LOG.info("Persisted configuration " + type + "/" + entity.getName()); @@ -326,37 +311,31 @@ public class ConfigurationStore implements FalconService { /** * Archive removed configuration in the persistent store - * - * @param type - * - Entity type to archive - * @param name - * - name - * @throws IOException - * If any error in accessing the storage + * + * @param type - Entity type to archive + * @param name - name + * @throws IOException If any error in accessing the storage */ private void archive(EntityType type, String name) throws IOException { Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type); fs.mkdirs(archivePath); - fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"), new Path(archivePath, - URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis())); + fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"), + new Path(archivePath, + URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis())); LOG.info("Archived configuration " + type + "/" + name); } /** - * - * @param type - * - Entity type to restore from persistent store - * @param name - * - Name of the entity to restore. - * @param - * - Actual entity object type + * @param type - Entity type to restore from persistent store + * @param name - Name of the entity to restore. + * @param - Actual entity object type * @return - De-serialized entity object restored from persistent store - * @throws IOException - * If any error in accessing the storage + * @throws IOException If any error in accessing the storage * @throws FalconException */ @SuppressWarnings("unchecked") - private synchronized T restore(EntityType type, String name) throws IOException, FalconException { + private synchronized T restore(EntityType type, String name) + throws IOException, FalconException { InputStream in = fs.open(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml")); try { @@ -379,5 +358,6 @@ public class ConfigurationStore implements FalconService { } @Override - public void destroy() { } + public void destroy() { + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java index b9b504d..a231242 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java +++ b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java @@ -22,14 +22,14 @@ import org.apache.falcon.FalconException; public class StoreAccessException extends FalconException { - /** - * @param e Exception - */ - public StoreAccessException(String message, Exception e) { - super(message, e); - } + /** + * @param e Exception + */ + public StoreAccessException(String message, Exception e) { + super(message, e); + } - public StoreAccessException(Exception e) { - super(e); - } + public StoreAccessException(Exception e) { + super(e); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java index 75d6d8b..f2b66e5 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java @@ -18,12 +18,6 @@ package org.apache.falcon.entity.v0; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; @@ -34,6 +28,12 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.log4j.Logger; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + public class EntityGraph implements ConfigurationChangeListener { private static Logger LOG = Logger.getLogger(EntityGraph.class); @@ -76,8 +76,9 @@ public class EntityGraph implements ConfigurationChangeListener { nodeEdges = getEdgesFor((Feed) entity); break; } - if (nodeEdges == null) + if (nodeEdges == null) { return; + } LOG.trace("Adding edges for " + entity.getName() + ": " + nodeEdges); for (Map.Entry> entry : nodeEdges.entrySet()) { @@ -101,8 +102,9 @@ public class EntityGraph implements ConfigurationChangeListener { nodeEdges = getEdgesFor((Feed) entity); break; } - if (nodeEdges == null) + if (nodeEdges == null) { return; + } for (Map.Entry> entry : nodeEdges.entrySet()) { if (graph.containsKey(entry.getKey())) { @@ -154,7 +156,7 @@ public class EntityGraph implements ConfigurationChangeListener { nodeEdges.put(clusterNode, new HashSet()); nodeEdges.get(clusterNode).add(processNode); } - + return nodeEdges; } @@ -188,19 +190,23 @@ public class EntityGraph implements ConfigurationChangeListener { @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) + } + if (o == null || getClass() != o.getClass()) { return false; + } Node node = (Node) o; boolean nameEqual = name != null ? !name.equals(node.name) : node.name != null; - if (nameEqual) + if (nameEqual) { return false; - if (type != node.type) + } + if (type != node.type) { return false; + } return true; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java index 1a9febc..b523c8b 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java @@ -18,21 +18,22 @@ package org.apache.falcon.entity.v0; +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; - public class EntityIntegrityChecker { public static Pair[] referencedBy(Entity entity) throws FalconException { Set deps = EntityGraph.get().getDependents(entity); - if(deps == null) + if (deps == null) { return null; - + } + switch (entity.getEntityType()) { case CLUSTER: return filter(deps, EntityType.FEED, EntityType.PROCESS); @@ -46,12 +47,14 @@ public class EntityIntegrityChecker { } @SuppressWarnings("unchecked") - private static Pair[] filter(Set deps, EntityType ... types) { - List> filteredSet = new ArrayList>(); + private static Pair[] filter(Set deps, EntityType... types) { + List> filteredSet = new ArrayList>(); List validTypes = Arrays.asList(types); - for(Entity dep:deps) - if(validTypes.contains(dep.getEntityType())) + for (Entity dep : deps) { + if (validTypes.contains(dep.getEntityType())) { filteredSet.add(Pair.of(dep.getName(), dep.getEntityType())); + } + } return filteredSet.toArray(new Pair[0]); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java index bbf2749..cad196b 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java @@ -24,7 +24,6 @@ import org.apache.falcon.FalconException; * This exception is thrown when Unschedulable entity * like CLUSTER is tried with actions like Schedule, Suspend, * Resume. - * */ public class UnschedulableEntityException extends FalconException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java index 02e76a7..f57ef95 100644 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java @@ -49,12 +49,13 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver return instance; } - private ExpressionHelper() {} + private ExpressionHelper() { + } public T evaluate(String expression, Class clazz) throws FalconException { return evaluateFullExpression("${" + expression + "}", clazz); } - + @SuppressWarnings("unchecked") public T evaluateFullExpression(String expression, Class clazz) throws FalconException { try { @@ -67,8 +68,9 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver @Override public Method resolveFunction(String prefix, String name) { for (Method method : ExpressionHelper.class.getDeclaredMethods()) { - if (method.getName().equals(name)) + if (method.getName().equals(name)) { return method; + } } throw new UnsupportedOperationException("Not found " + prefix + ":" + name); } @@ -146,15 +148,15 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver } public static Date latest(int n) { - //by pass Falcon validations - return referenceDate.get(); + //by pass Falcon validations + return referenceDate.get(); } - + public static Date future(int n, int limit) { - //by pass Falcon validations - return referenceDate.get(); + //by pass Falcon validations + return referenceDate.get(); } - + public static long hours(int val) { return TimeUnit.HOURS.toMillis(val); } @@ -180,19 +182,19 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver } public static String substitute(String originalValue, Properties properties) { - Matcher envVarMatcher = sysPropertyPattern.matcher(originalValue); - while (envVarMatcher.find()) { - String envVar = originalValue.substring(envVarMatcher.start() + 2, - envVarMatcher.end() - 1); - String envVal = properties.getProperty(envVar, System.getenv(envVar)); - - envVar = "\\$\\{" + envVar + "\\}"; - if (envVal != null) { - originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal)); - envVarMatcher = sysPropertyPattern.matcher(originalValue); + Matcher envVarMatcher = sysPropertyPattern.matcher(originalValue); + while (envVarMatcher.find()) { + String envVar = originalValue.substring(envVarMatcher.start() + 2, + envVarMatcher.end() - 1); + String envVal = properties.getProperty(envVar, System.getenv(envVar)); + + envVar = "\\$\\{" + envVar + "\\}"; + if (envVal != null) { + originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal)); + envVarMatcher = sysPropertyPattern.matcher(originalValue); + } } - } - return originalValue; + return originalValue; } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/group/FeedGroup.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java index 4dade88..e40ab13 100644 --- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java +++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java @@ -17,6 +17,11 @@ */ package org.apache.falcon.group; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.common.FeedDataPath; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.LocationType; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -24,80 +29,75 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.LocationType; - /** * Group, which represents a logical group of feeds which can belong to this * group. */ public class FeedGroup { - - public FeedGroup(String group, Frequency frequency, String path) { - this.name = group; - this.frequency = frequency; - this.datePattern = getDatePattern(path); - this.feeds = Collections - .newSetFromMap(new ConcurrentHashMap()); - } - public static String getDatePattern(String path) { - Matcher matcher = FeedDataPath.PATTERN.matcher(path); - List fields = new ArrayList(); - while (matcher.find()) { - String var = path.substring(matcher.start(), matcher.end()); - fields.add(var); - } - Collections.sort(fields); - return fields.toString(); - } + public FeedGroup(String group, Frequency frequency, String path) { + this.name = group; + this.frequency = frequency; + this.datePattern = getDatePattern(path); + this.feeds = Collections + .newSetFromMap(new ConcurrentHashMap()); + } + + public static String getDatePattern(String path) { + Matcher matcher = FeedDataPath.PATTERN.matcher(path); + List fields = new ArrayList(); + while (matcher.find()) { + String var = path.substring(matcher.start(), matcher.end()); + fields.add(var); + } + Collections.sort(fields); + return fields.toString(); + } - private String name; - private Frequency frequency; - private String datePattern; - private Set feeds; + private String name; + private Frequency frequency; + private String datePattern; + private Set feeds; - public Set getFeeds() { - return feeds; - } + public Set getFeeds() { + return feeds; + } - @Override - public boolean equals(Object obj) { - if (!(obj instanceof FeedGroup) || obj == null) { - return false; - } - FeedGroup group = (FeedGroup) obj; - return (this.name.equals(group.getName()) - && this.frequency.equals(group.frequency) - && this.datePattern - .equals(group.datePattern)); + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FeedGroup) || obj == null) { + return false; + } + FeedGroup group = (FeedGroup) obj; + return (this.name.equals(group.getName()) + && this.frequency.equals(group.frequency) + && this.datePattern + .equals(group.datePattern)); - } + } - @Override - public int hashCode() { - return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode(); - } + @Override + public int hashCode() { + return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode(); + } - public String getName() { - return name; - } + public String getName() { + return name; + } - public Frequency getFrequency() { - return frequency; - } + public Frequency getFrequency() { + return frequency; + } - public String getDatePattern() { - return datePattern; - } + public String getDatePattern() { + return datePattern; + } - public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) { - if (this.frequency.equals(feed.getFrequency()) - && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()))) { - return true; - } - return false; - } + public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) { + if (this.frequency.equals(feed.getFrequency()) + && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()))) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java index e154a14..532392f 100644 --- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java +++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java @@ -17,12 +17,6 @@ */ package org.apache.falcon.group; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.FeedHelper; @@ -33,88 +27,94 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.service.ConfigurationChangeListener; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** * Has 2 way mappings from feed to group and group to feed */ public class FeedGroupMap implements ConfigurationChangeListener { - private static final FeedGroupMap instance = new FeedGroupMap(); - private Map groupsMapping = new ConcurrentHashMap(); - - private FeedGroupMap() { - // singleton - } - - public static FeedGroupMap get() { - return instance; - } - - public Map getGroupsMapping() { - return Collections.unmodifiableMap(groupsMapping); - } - - @Override - public void onAdd(Entity entity) throws FalconException { - - if (entity.getEntityType().equals(EntityType.FEED)) { - Feed feed = (Feed) entity; - if (feed.getGroups() == null || feed.getGroups().equals("")) { - return; - } - Set groupSet = getGroups(feed); - addGroups(feed.getName(), groupSet); - } - - } - - @Override - public void onRemove(Entity entity) throws FalconException { - if (entity.getEntityType().equals(EntityType.FEED)) { - Feed feed = (Feed) entity; - if (StringUtils.isEmpty(feed.getGroups())) { - return; - } - String[] groups = feed.getGroups().split(","); - for (String group : groups) { - groupsMapping.get(group).getFeeds().remove(entity.getName()); - if (groupsMapping.get(group).getFeeds().size() == 0) { - groupsMapping.remove(group); - } - } - - } - - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) - throws FalconException { - onRemove(oldEntity); - onAdd(newEntity); - } - - private void addGroups(String feed, Set groups) { - for (FeedGroup group : groups) { - if (groupsMapping.containsKey(group.getName())) { - groupsMapping.get(group.getName()).getFeeds().add(feed); - } else { - group.getFeeds().add(feed); - groupsMapping.put(group.getName(), group); - } - } - } - - public Set getGroups(String groups, Frequency frequency, String path) { - Set groupSet = new HashSet(); - String[] groupArray = groups.split(","); - for (String group : groupArray) { - groupSet.add(new FeedGroup(group, frequency, path)); - } - return groupSet; - } - - public Set getGroups(org.apache.falcon.entity.v0.feed.Feed feed) { - return getGroups(feed.getGroups(), feed.getFrequency(), - FeedHelper.getLocation(feed, LocationType.DATA).getPath()); - } + private static final FeedGroupMap instance = new FeedGroupMap(); + private Map groupsMapping = new ConcurrentHashMap(); + + private FeedGroupMap() { + // singleton + } + + public static FeedGroupMap get() { + return instance; + } + + public Map getGroupsMapping() { + return Collections.unmodifiableMap(groupsMapping); + } + + @Override + public void onAdd(Entity entity) throws FalconException { + + if (entity.getEntityType().equals(EntityType.FEED)) { + Feed feed = (Feed) entity; + if (feed.getGroups() == null || feed.getGroups().equals("")) { + return; + } + Set groupSet = getGroups(feed); + addGroups(feed.getName(), groupSet); + } + + } + + @Override + public void onRemove(Entity entity) throws FalconException { + if (entity.getEntityType().equals(EntityType.FEED)) { + Feed feed = (Feed) entity; + if (StringUtils.isEmpty(feed.getGroups())) { + return; + } + String[] groups = feed.getGroups().split(","); + for (String group : groups) { + groupsMapping.get(group).getFeeds().remove(entity.getName()); + if (groupsMapping.get(group).getFeeds().size() == 0) { + groupsMapping.remove(group); + } + } + + } + + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) + throws FalconException { + onRemove(oldEntity); + onAdd(newEntity); + } + + private void addGroups(String feed, Set groups) { + for (FeedGroup group : groups) { + if (groupsMapping.containsKey(group.getName())) { + groupsMapping.get(group.getName()).getFeeds().add(feed); + } else { + group.getFeeds().add(feed); + groupsMapping.put(group.getName(), group); + } + } + } + + public Set getGroups(String groups, Frequency frequency, String path) { + Set groupSet = new HashSet(); + String[] groupArray = groups.split(","); + for (String group : groupArray) { + groupSet.add(new FeedGroup(group, frequency, path)); + } + return groupSet; + } + + public Set getGroups(org.apache.falcon.entity.v0.feed.Feed feed) { + return getGroups(feed.getGroups(), feed.getFrequency(), + FeedHelper.getLocation(feed, LocationType.DATA).getPath()); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/CurrentUser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java index 80c4de9..9a3086c 100644 --- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java +++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java @@ -40,7 +40,9 @@ public final class CurrentUser { throw new IllegalStateException ("Bad user name sent for authentication"); } - if (user.equals(getUserInternal())) return; + if (user.equals(getUserInternal())) { + return; + } Subject subject = new Subject(); subject.getPrincipals().add(new FalconPrincipal(user)); @@ -66,7 +68,7 @@ public final class CurrentUser { if (subject == null) { return null; } else { - for(FalconPrincipal principal: subject. + for (FalconPrincipal principal : subject. getPrincipals(FalconPrincipal.class)) { return principal.getName(); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java index d6fa26d..a27a342 100644 --- a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java +++ b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java @@ -40,7 +40,7 @@ public class FalconLoginModule implements LoginModule { } private T getCanonicalUser(Class cls) { - for(T user: subject.getPrincipals(cls)) { + for (T user : subject.getPrincipals(cls)) { return user; } return null; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java index b88f27a..20ec8df 100644 --- a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java +++ b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java @@ -27,7 +27,7 @@ public class FalconSecurityConfiguration extends Configuration { private static final AppConfigurationEntry OS_SPECIFIC_LOGIN = new AppConfigurationEntry(SecurityConstants.OS_LOGIN_MODULE_NAME, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, - new HashMap()); + new HashMap()); private static final AppConfigurationEntry[] SIMPLE_CONF = new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN}; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/LogCleanupService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java index c1fe265..17d5926 100644 --- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java +++ b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java @@ -17,13 +17,6 @@ */ package org.apache.falcon.service; -import java.util.Date; -import java.util.Timer; -import java.util.TimerTask; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; - import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; @@ -34,58 +27,64 @@ import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.util.StartupProperties; import org.apache.log4j.Logger; +import javax.servlet.jsp.el.ELException; +import javax.servlet.jsp.el.ExpressionEvaluator; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + public class LogCleanupService implements FalconService { - private static final Logger LOG = Logger.getLogger(LogCleanupService.class); - private final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); - private final ExpressionHelper resolver = ExpressionHelper.get(); + private static final Logger LOG = Logger.getLogger(LogCleanupService.class); + private final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); + private final ExpressionHelper resolver = ExpressionHelper.get(); - @Override - public String getName() { - return "Falcon Log cleanup service"; - } + @Override + public String getName() { + return "Falcon Log cleanup service"; + } - @Override - public void init() throws FalconException { - Timer timer = new Timer(); - timer.schedule(new CleanupThread(), 0, getDelay()); - LOG.info("Falcon log cleanup service initialized"); + @Override + public void init() throws FalconException { + Timer timer = new Timer(); + timer.schedule(new CleanupThread(), 0, getDelay()); + LOG.info("Falcon log cleanup service initialized"); - } + } - private class CleanupThread extends TimerTask { + private class CleanupThread extends TimerTask { - private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler(); - private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler(); + private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler(); + private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler(); - @Override - public void run() { - try { - LOG.info("Cleaning up logs at: " + new Date()); - processCleanupHandler.cleanup(); - feedCleanupHandler.cleanup(); - } catch (Throwable t) { - LOG.error("Error in cleanup task: ", t); - GenericAlert.alertLogCleanupServiceFailed( - "Exception in log cleanup service", t); - } - } - } + @Override + public void run() { + try { + LOG.info("Cleaning up logs at: " + new Date()); + processCleanupHandler.cleanup(); + feedCleanupHandler.cleanup(); + } catch (Throwable t) { + LOG.error("Error in cleanup task: ", t); + GenericAlert.alertLogCleanupServiceFailed( + "Exception in log cleanup service", t); + } + } + } - @Override - public void destroy() throws FalconException { - LOG.info("Falcon log cleanup service destroyed"); - } + @Override + public void destroy() throws FalconException { + LOG.info("Falcon log cleanup service destroyed"); + } - private long getDelay() throws FalconException { - String delay = StartupProperties.get().getProperty( - "falcon.cleanup.service.frequency", "days(1)"); - try { - return (Long) EVALUATOR.evaluate("${" + delay + "}", Long.class, - resolver, resolver); - } catch (ELException e) { - throw new FalconException("Exception in EL evaluation", e); - } - } + private long getDelay() throws FalconException { + String delay = StartupProperties.get().getProperty( + "falcon.cleanup.service.frequency", "days(1)"); + try { + return (Long) EVALUATOR.evaluate("${" + delay + "}", Long.class, + resolver, resolver); + } catch (ELException e) { + throw new FalconException("Exception in EL evaluation", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java index f44ad12..466cb81 100644 --- a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java +++ b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java @@ -33,13 +33,15 @@ public class ServiceInitializer { getProperty("application.services", "org.apache.falcon.entity.store.ConfigurationStore"); for (String serviceClassName : serviceClassNames.split(",")) { serviceClassName = serviceClassName.trim(); - if (serviceClassName.isEmpty()) continue; + if (serviceClassName.isEmpty()) { + continue; + } FalconService service = ReflectionUtils.getInstanceByClassName(serviceClassName); services.register(service); LOG.info("Initializing service : " + serviceClassName); try { service.init(); - } catch(Throwable t) { + } catch (Throwable t) { LOG.fatal("Failed to initialize service " + serviceClassName, t); throw new FalconException(t); } @@ -52,7 +54,7 @@ public class ServiceInitializer { LOG.info("Destroying service : " + service.getClass().getName()); try { service.destroy(); - } catch(Throwable t) { + } catch (Throwable t) { LOG.fatal("Failed to destroy service " + service.getClass().getName(), t); throw new FalconException(t); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/Services.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/Services.java b/common/src/main/java/org/apache/falcon/service/Services.java index dc41d5d..955e906 100644 --- a/common/src/main/java/org/apache/falcon/service/Services.java +++ b/common/src/main/java/org/apache/falcon/service/Services.java @@ -18,22 +18,23 @@ package org.apache.falcon.service; +import org.apache.falcon.FalconException; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.log4j.Logger; + import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.NoSuchElementException; -import org.apache.falcon.FalconException; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.log4j.Logger; - public final class Services implements Iterable { private static final Logger LOG = Logger.getLogger(Services.class); private static Services instance = new Services(); - private Services() { } + private Services() { + } public static Services get() { return instance; @@ -80,8 +81,8 @@ public final class Services implements Iterable { register(service); return service; } - - public void reset(){ - services.clear(); + + public void reset() { + services.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java index 7f36e12..4e199da 100644 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java @@ -18,9 +18,6 @@ package org.apache.falcon.update; -import java.util.ArrayList; -import java.util.List; - import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; @@ -36,31 +33,40 @@ import org.apache.falcon.entity.v0.process.Inputs; import org.apache.falcon.entity.v0.process.Process; import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.List; + public final class UpdateHelper { private static final Logger LOG = Logger.getLogger(UpdateHelper.class); - private static final String[] FEED_FIELDS = new String[] { "partitions", "groups", "lateArrival.cutOff", "schema.location", "schema.provider", - "ACL.group", "ACL.owner", "ACL.permission"}; - private static final String[] PROCESS_FIELDS = new String[] { "retry.policy", "retry.delay", "retry.attempts", - "lateProcess.policy", "lateProcess.delay", "lateProcess.lateInputs[\\d+].input", "lateProcess.lateInputs[\\d+].workflowPath"}; - + private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff", + "schema.location", "schema.provider", + "ACL.group", "ACL.owner", "ACL.permission"}; + private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts", + "lateProcess.policy", "lateProcess.delay", + "lateProcess.lateInputs[\\d+].input", + "lateProcess.lateInputs[\\d+].workflowPath"}; + public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, String cluster) throws FalconException { Entity oldView = EntityUtil.getClusterView(oldEntity, cluster); Entity newView = EntityUtil.getClusterView(newEntity, cluster); - switch(oldEntity.getEntityType()) { + switch (oldEntity.getEntityType()) { case FEED: - if(EntityUtil.equals(oldView, newView, FEED_FIELDS)) + if (EntityUtil.equals(oldView, newView, FEED_FIELDS)) { return false; + } return true; - + case PROCESS: - if(EntityUtil.equals(oldView, newView, PROCESS_FIELDS)) + if (EntityUtil.equals(oldView, newView, PROCESS_FIELDS)) { return false; + } return true; } throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType()); } - public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity) throws FalconException { + public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity) + throws FalconException { if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) { return shouldUpdate((Feed) oldEntity, (Feed) newEntity, (Process) affectedEntity); } else { @@ -71,31 +77,33 @@ public final class UpdateHelper { } public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) { - if (!FeedHelper - .getLocation(oldFeed.getLocations(), LocationType.DATA) - .getPath() - .equals(FeedHelper.getLocation(newFeed.getLocations(), - LocationType.DATA).getPath()) - || !FeedHelper - .getLocation(oldFeed.getLocations(), LocationType.META) - .getPath() - .equals(FeedHelper.getLocation(newFeed.getLocations(), - LocationType.META).getPath()) - || !FeedHelper - .getLocation(oldFeed.getLocations(), LocationType.STATS) - .getPath() - .equals(FeedHelper.getLocation(newFeed.getLocations(), - LocationType.STATS).getPath()) - || !FeedHelper - .getLocation(oldFeed.getLocations(), LocationType.TMP) - .getPath() - .equals(FeedHelper.getLocation(newFeed.getLocations(), - LocationType.TMP).getPath())) + if (!FeedHelper + .getLocation(oldFeed.getLocations(), LocationType.DATA) + .getPath() + .equals(FeedHelper.getLocation(newFeed.getLocations(), + LocationType.DATA).getPath()) + || !FeedHelper + .getLocation(oldFeed.getLocations(), LocationType.META) + .getPath() + .equals(FeedHelper.getLocation(newFeed.getLocations(), + LocationType.META).getPath()) + || !FeedHelper + .getLocation(oldFeed.getLocations(), LocationType.STATS) + .getPath() + .equals(FeedHelper.getLocation(newFeed.getLocations(), + LocationType.STATS).getPath()) + || !FeedHelper + .getLocation(oldFeed.getLocations(), LocationType.TMP) + .getPath() + .equals(FeedHelper.getLocation(newFeed.getLocations(), + LocationType.TMP).getPath())) { return true; + } LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring..."); - if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) + if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) { return true; + } LOG.debug(oldFeed.toShortString() + ": Frequency identical. Ignoring..."); // it is not possible to have oldFeed partitions as non empty and @@ -118,39 +126,42 @@ public final class UpdateHelper { if (newFeed.getPartitions() != null && oldFeed.getPartitions() != null) { List newParts = getPartitions(newFeed.getPartitions()); List oldParts = getPartitions(oldFeed.getPartitions()); - if (newParts.size() != oldParts.size()) + if (newParts.size() != oldParts.size()) { return true; - if (!newParts.containsAll(oldParts)) + } + if (!newParts.containsAll(oldParts)) { return true; + } } LOG.debug(oldFeed.toShortString() + ": Partitions identical. Ignoring..."); } } for (Cluster cluster : affectedProcess.getClusters().getClusters()) { - if (!FeedHelper - .getCluster(oldFeed, cluster.getName()) - .getValidity() - .getStart() - .equals(FeedHelper.getCluster(newFeed, cluster.getName()) - .getValidity().getStart()) - || !FeedHelper.getLocation(oldFeed, LocationType.DATA, - cluster.getName()).getPath().equals( - FeedHelper.getLocation(newFeed, LocationType.DATA, - cluster.getName()).getPath()) - || !FeedHelper.getLocation(oldFeed, LocationType.META, - cluster.getName()).getPath().equals( - FeedHelper.getLocation(newFeed, LocationType.META, - cluster.getName()).getPath()) - || !FeedHelper.getLocation(oldFeed, LocationType.STATS, - cluster.getName()).getPath().equals( - FeedHelper.getLocation(newFeed, LocationType.STATS, - cluster.getName()).getPath()) - || !FeedHelper.getLocation(oldFeed, LocationType.TMP, - cluster.getName()).getPath().equals( - FeedHelper.getLocation(newFeed, LocationType.TMP, - cluster.getName()).getPath())) - return true; + if (!FeedHelper + .getCluster(oldFeed, cluster.getName()) + .getValidity() + .getStart() + .equals(FeedHelper.getCluster(newFeed, cluster.getName()) + .getValidity().getStart()) + || !FeedHelper.getLocation(oldFeed, LocationType.DATA, + cluster.getName()).getPath().equals( + FeedHelper.getLocation(newFeed, LocationType.DATA, + cluster.getName()).getPath()) + || !FeedHelper.getLocation(oldFeed, LocationType.META, + cluster.getName()).getPath().equals( + FeedHelper.getLocation(newFeed, LocationType.META, + cluster.getName()).getPath()) + || !FeedHelper.getLocation(oldFeed, LocationType.STATS, + cluster.getName()).getPath().equals( + FeedHelper.getLocation(newFeed, LocationType.STATS, + cluster.getName()).getPath()) + || !FeedHelper.getLocation(oldFeed, LocationType.TMP, + cluster.getName()).getPath().equals( + FeedHelper.getLocation(newFeed, LocationType.TMP, + cluster.getName()).getPath())) { + return true; + } LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring..."); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java index f618876..f5dbc83 100644 --- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java +++ b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java @@ -18,6 +18,10 @@ package org.apache.falcon.util; +import org.apache.falcon.FalconException; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.log4j.Logger; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -26,10 +30,6 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; -import org.apache.falcon.FalconException; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.log4j.Logger; - public abstract class ApplicationProperties extends Properties { private static Logger LOG = Logger.getLogger(ApplicationProperties.class); @@ -52,7 +52,7 @@ public abstract class ApplicationProperties extends Properties { public String getDomain() { return domain; } - + protected void initialize() { String propFile = getPropertyFile(); String userHome = System.getProperty("user.home"); @@ -68,7 +68,8 @@ public abstract class ApplicationProperties extends Properties { location = LocationType.FILE; propertyFile = new File(confDir, propFile).getAbsolutePath(); } else { - LOG.info("config.location is not set, properties file not present in " + "user home dir, falling back to classpath for " + LOG.info("config.location is not set, properties file not present in " + + "user home dir, falling back to classpath for " + propFile); location = LocationType.CLASSPATH; propertyFile = propFile; @@ -99,15 +100,16 @@ public abstract class ApplicationProperties extends Properties { LOG.info("Loading properties from " + propertyFile); Properties origProps = new Properties(); origProps.load(resource); - if(domain == null) { + if (domain == null) { domain = origProps.getProperty("*.domain"); - if(domain == null) + if (domain == null) { throw new FalconException("Domain is not set!"); + } } LOG.info("Initializing properties with domain " + domain); - + Set keys = getKeys(origProps.keySet()); - for(String key:keys) { + for (String key : keys) { String value = origProps.getProperty(domain + "." + key, origProps.getProperty("*." + key)); value = ExpressionHelper.substitute(value); LOG.debug(key + "=" + value); @@ -124,7 +126,7 @@ public abstract class ApplicationProperties extends Properties { private Set getKeys(Set keySet) { Set keys = new HashSet(); - for(Object keyObj:keySet) { + for (Object keyObj : keySet) { String key = (String) keyObj; keys.add(key.substring(key.indexOf('.') + 1)); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/BuildProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/BuildProperties.java b/common/src/main/java/org/apache/falcon/util/BuildProperties.java index e91a647..898daee 100644 --- a/common/src/main/java/org/apache/falcon/util/BuildProperties.java +++ b/common/src/main/java/org/apache/falcon/util/BuildProperties.java @@ -27,26 +27,26 @@ public class BuildProperties extends ApplicationProperties { private static final String PROPERTY_FILE = "falcon-buildinfo.properties"; private static final AtomicReference instance = - new AtomicReference(); + new AtomicReference(); private BuildProperties() throws FalconException { - super(); + super(); } @Override protected String getPropertyFile() { - return PROPERTY_FILE; + return PROPERTY_FILE; } public static Properties get() { - try { - if (instance.get() == null) { - instance.compareAndSet(null, new BuildProperties()); + try { + if (instance.get() == null) { + instance.compareAndSet(null, new BuildProperties()); + } + return instance.get(); + } catch (FalconException e) { + throw new RuntimeException("Unable to read application " + + "falcon build information properties", e); } - return instance.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + - "falcon build information properties", e); - } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java index d13b817..4e2f7db 100644 --- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java +++ b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java @@ -27,26 +27,26 @@ public class DeploymentProperties extends ApplicationProperties { private static final String PROPERTY_FILE = "deploy.properties"; private static final AtomicReference instance = - new AtomicReference(); + new AtomicReference(); private DeploymentProperties() throws FalconException { - super(); + super(); } @Override protected String getPropertyFile() { - return PROPERTY_FILE; + return PROPERTY_FILE; } public static Properties get() { - try { - if (instance.get() == null) { - instance.compareAndSet(null, new DeploymentProperties()); + try { + if (instance.get() == null) { + instance.compareAndSet(null, new DeploymentProperties()); + } + return instance.get(); + } catch (FalconException e) { + throw new RuntimeException("Unable to read application " + + "startup properties", e); } - return instance.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + - "startup properties", e); - } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java index 86acb81..9aeb3ab 100644 --- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java @@ -18,12 +18,12 @@ package org.apache.falcon.util; -import java.util.HashSet; -import java.util.Set; - import org.apache.falcon.entity.ColoClusterRelation; import org.apache.log4j.Logger; +import java.util.HashSet; +import java.util.Set; + public class DeploymentUtil { private static final Logger LOG = Logger.getLogger(DeploymentUtil.class); @@ -35,7 +35,7 @@ public class DeploymentUtil { protected final static String currentColo; protected final static boolean embeddedMode; protected static boolean prism = false; - + static { DEFAULT_ALL_COLOS.add(DEFAULT_COLO); embeddedMode = DeploymentProperties.get(). @@ -44,33 +44,33 @@ public class DeploymentUtil { currentColo = DEFAULT_COLO; } else { currentColo = StartupProperties.get(). - getProperty("current.colo", DEFAULT_COLO); + getProperty("current.colo", DEFAULT_COLO); } LOG.info("Running in embedded mode? " + embeddedMode); LOG.info("Current colo: " + currentColo); } - + public static void setPrismMode() { - prism = true; + prism = true; } - + public static boolean isPrism() { - return !embeddedMode && prism; + return !embeddedMode && prism; } - + public static String getCurrentColo() { return currentColo; } - + public static Set getCurrentClusters() { String colo = getCurrentColo(); return ColoClusterRelation.get().getClusters(colo); } - + public static boolean isEmbeddedMode() { return embeddedMode; } - + public static String getDefaultColo() { return DEFAULT_COLO; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java index 8f255fd..d1bed8e 100644 --- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java +++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java @@ -18,10 +18,10 @@ package org.apache.falcon.util; -import java.lang.reflect.Method; - import org.apache.falcon.FalconException; +import java.lang.reflect.Method; + public final class ReflectionUtils { public static T getInstance(String classKey) throws FalconException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java index 6ecf33f..86a54f8 100644 --- a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java +++ b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java @@ -18,72 +18,72 @@ package org.apache.falcon.util; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.falcon.FalconException; import org.apache.log4j.Logger; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + public class RuntimeProperties extends ApplicationProperties { - private static Logger LOG = Logger.getLogger(RuntimeProperties.class); + private static Logger LOG = Logger.getLogger(RuntimeProperties.class); - private static final String PROPERTY_FILE = "runtime.properties"; + private static final String PROPERTY_FILE = "runtime.properties"; - private static final AtomicReference instance = - new AtomicReference(); + private static final AtomicReference instance = + new AtomicReference(); - private RuntimeProperties() throws FalconException { - super(); - Thread refreshThread = new Thread(new DynamicLoader(this)); - refreshThread.start(); - } + private RuntimeProperties() throws FalconException { + super(); + Thread refreshThread = new Thread(new DynamicLoader(this)); + refreshThread.start(); + } - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } + @Override + protected String getPropertyFile() { + return PROPERTY_FILE; + } - public static Properties get() { - try { - if (instance.get() == null) { - instance.compareAndSet(null, new RuntimeProperties()); - } - return instance.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + - "runtime properties", e); + public static Properties get() { + try { + if (instance.get() == null) { + instance.compareAndSet(null, new RuntimeProperties()); + } + return instance.get(); + } catch (FalconException e) { + throw new RuntimeException("Unable to read application " + + "runtime properties", e); + } } - } - private class DynamicLoader implements Runnable { + private class DynamicLoader implements Runnable { - private static final long REFRESH_DELAY = 300000L; - private static final int MAX_ITER = 20; //1hr - private final ApplicationProperties applicationProperties; + private static final long REFRESH_DELAY = 300000L; + private static final int MAX_ITER = 20; //1hr + private final ApplicationProperties applicationProperties; - private DynamicLoader(ApplicationProperties applicationProperties) { - this.applicationProperties = applicationProperties; - } + private DynamicLoader(ApplicationProperties applicationProperties) { + this.applicationProperties = applicationProperties; + } - @Override - public void run() { - long backOffDelay = REFRESH_DELAY; - while (true) { - try { - try { - applicationProperties.loadProperties(); - backOffDelay = REFRESH_DELAY; - } catch (FalconException e) { - LOG.warn("Error refreshing runtime properties", e); - backOffDelay += REFRESH_DELAY; - } - Thread.sleep(Math.min(MAX_ITER * REFRESH_DELAY, backOffDelay)); - } catch (InterruptedException e) { - LOG.info("Application is stopping. Aborting..."); - break; + @Override + public void run() { + long backOffDelay = REFRESH_DELAY; + while (true) { + try { + try { + applicationProperties.loadProperties(); + backOffDelay = REFRESH_DELAY; + } catch (FalconException e) { + LOG.warn("Error refreshing runtime properties", e); + backOffDelay += REFRESH_DELAY; + } + Thread.sleep(Math.min(MAX_ITER * REFRESH_DELAY, backOffDelay)); + } catch (InterruptedException e) { + LOG.info("Application is stopping. Aborting..."); + break; + } + } } - } } - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/StartupProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java index e264758..4a19df4 100644 --- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java +++ b/common/src/main/java/org/apache/falcon/util/StartupProperties.java @@ -18,36 +18,36 @@ package org.apache.falcon.util; +import org.apache.falcon.FalconException; + import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; -import org.apache.falcon.FalconException; - public class StartupProperties extends ApplicationProperties { - private static final String PROPERTY_FILE = "startup.properties"; - - private static final AtomicReference instance = - new AtomicReference(); - - private StartupProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - public static Properties get() { - try { - if (instance.get() == null) { - instance.compareAndSet(null, new StartupProperties()); - } - return instance.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + - "startup properties", e); + private static final String PROPERTY_FILE = "startup.properties"; + + private static final AtomicReference instance = + new AtomicReference(); + + private StartupProperties() throws FalconException { + super(); + } + + @Override + protected String getPropertyFile() { + return PROPERTY_FILE; + } + + public static Properties get() { + try { + if (instance.get() == null) { + instance.compareAndSet(null, new StartupProperties()); + } + return instance.get(); + } catch (FalconException e) { + throw new RuntimeException("Unable to read application " + + "startup properties", e); + } } - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java index 9b75327..76a9edc 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java @@ -18,15 +18,15 @@ package org.apache.falcon.workflow; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.util.ReflectionUtils; + import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.util.ReflectionUtils; - public abstract class WorkflowBuilder { public static WorkflowBuilder getBuilder(String engine, Entity entity) throws FalconException { @@ -35,8 +35,9 @@ public abstract class WorkflowBuilder { } public abstract Map newWorkflowSchedule(T entity, List clusters) throws FalconException; - - public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user) throws FalconException; + + public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user) + throws FalconException; public abstract String[] getWorkflowNames(T entity); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java index 5880f68..a267e39 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java @@ -27,11 +27,12 @@ public class WorkflowEngineFactory { private static final String WORKFLOW_ENGINE = "workflow.engine.impl"; - private WorkflowEngineFactory() { } + private WorkflowEngineFactory() { + } - public static AbstractWorkflowEngine getWorkflowEngine() + public static AbstractWorkflowEngine getWorkflowEngine() throws FalconException { return ReflectionUtils.getInstance(WORKFLOW_ENGINE); - } + } }