Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A382F694 for ; Fri, 26 Apr 2013 15:50:52 +0000 (UTC) Received: (qmail 96772 invoked by uid 500); 26 Apr 2013 15:50:52 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 96753 invoked by uid 500); 26 Apr 2013 15:50:52 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 96745 invoked by uid 99); 26 Apr 2013 15:50:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 15:50:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 26 Apr 2013 15:50:39 +0000 Received: (qmail 93683 invoked by uid 99); 26 Apr 2013 15:50:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 15:50:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5C561882068; Fri, 26 Apr 2013 15:50:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sriksun@apache.org To: commits@falcon.incubator.apache.org Date: Fri, 26 Apr 2013 15:50:35 -0000 Message-Id: <7ba85f96901a41578bc0f87091a26b11@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/47] Fixes for Checkstyle X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/client/src/main/resources/feed-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd index e0e2204..a2b73fe 100644 --- a/client/src/main/resources/feed-0.1.xsd +++ b/client/src/main/resources/feed-0.1.xsd @@ -16,8 +16,9 @@ See the License for the specific language governing permissions and limitations under the License. --> - + @@ -44,7 +45,7 @@ - + @@ -76,30 +77,30 @@ - - - - + + + + - + - + - - - - - - + + + + + + - - + + @@ -118,19 +119,20 @@ specified by expression frequency(times), ex: if feed should be retained for at least 6 hours then retention's limit="hours(6)". The field partitionExp contains - partition tags. Number of partition tags has to be equal to number of partitions specified in feed schema. + partition tags. Number of partition tags has to be equal to number of partitions specified in feed + schema. A partition tag can be a wildcard(*), a static string or an expression. Atleast one of the strings has to be an expression. - - + + - - - + + + @@ -140,7 +142,7 @@ - + @@ -149,8 +151,8 @@ for a feed and the provider of schema like protobuf, thrift etc. - - + + @@ -159,7 +161,7 @@ - + @@ -174,8 +176,8 @@ validation. - - + + @@ -184,7 +186,7 @@ - + @@ -201,7 +203,7 @@ upto 8 hours then late-arrival's cut-off="hours(8)" - + @@ -210,8 +212,8 @@ workflow engine. - - + + @@ -225,13 +227,13 @@ - - - + + + - + @@ -246,11 +248,11 @@ periodically. ex: type="data" path="/projects/TrafficHourly/${YEAR}-${MONTH}-${DAY}/traffic" - - + + - + @@ -258,9 +260,9 @@ Access control list for this feed. - - - + + + @@ -273,10 +275,10 @@ archive, delete, chown and chmod. - - - - + + + + @@ -291,47 +293,50 @@ - - + + - - - - + + + + - + - + - + - + - + - + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/client/src/main/resources/jaxb-binding.xjb ---------------------------------------------------------------------- diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb index 7e4cb12..93e72af 100644 --- a/client/src/main/resources/jaxb-binding.xjb +++ b/client/src/main/resources/jaxb-binding.xjb @@ -17,19 +17,17 @@ limitations under the License. --> - + jaxb:version="2.1" + jaxb:extensionBindingPrefixes="xjc jaxb xs inheritance annox" + xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" + xmlns:xjc="http://java.sun.com/xml/ns/jaxb/xjc" + xmlns:inheritance="http://jaxb2-commons.dev.java.net/basic/inheritance" + xmlns:xs="http://www.w3.org/2001/XMLSchema"> + org.apache.falcon.entity.v0.Entity - + org.apache.falcon.entity.v0.Entity @@ -38,8 +36,8 @@ org.apache.falcon.entity.v0.Entity - + - - - + + + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/client/src/main/resources/process-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd index e322353..1368d85 100644 --- a/client/src/main/resources/process-0.1.xsd +++ b/client/src/main/resources/process-0.1.xsd @@ -16,8 +16,9 @@ See the License for the specific language governing permissions and limitations under the License. --> - + Licensed to the Apache Software Foundation (ASF) under one or more @@ -43,7 +44,7 @@ - + @@ -92,8 +93,8 @@ - - + + @@ -119,16 +120,17 @@ - + - + - Defines inputs for the workflow. The workflow will run only when the scheduled time is up and all the inputs are available + Defines inputs for the workflow. The workflow will run only when the scheduled + time is up and all the inputs are available @@ -142,15 +144,18 @@ - Defines the workflow that should run. The workflow should be defined with respect to the workflow specification of the workflow engine. + Defines the workflow that should run. The workflow should be defined with respect + to the workflow specification of the workflow engine. Only - oozie workflow engine is supported as of now. The workflow path is the path on hdfs which contains the workflow xml + oozie workflow engine is supported as of now. The workflow path is the path on hdfs which + contains the workflow xml - Retry defines how to handle workflow failures. Policy type - backoff, exponention backoff along with the delay define how frequenctly + Retry defines how to handle workflow failures. Policy type - backoff, exponention + backoff along with the delay define how frequenctly the workflow should be re-tried. Number of attempts define how many times to re-try the failures. @@ -158,19 +163,21 @@ - Late process defines how the late data should be handled. Late policy - backoff, exponential backoff, final along with delay + Late process defines how the late data should be handled. Late policy - backoff, + exponential backoff, final along with delay define how - frequently Falcon should check for late data. The late data handling can be customized for each input separatly. + frequently Falcon should check for late data. The late data handling can be customized for each + input separatly. - + - + @@ -189,13 +196,14 @@ - Defines the cluster where the workflow should run. In addition, it also defines the validity of the workflow on this cluster + Defines the cluster where the workflow should run. In addition, it also defines the validity of the + workflow on this cluster - + - + @@ -204,37 +212,41 @@ Defines the vailidity of the workflow as start and end time - - + + - + - + - - - + + + - + - + @@ -242,9 +254,11 @@ - Defines input for the workflow. Each input maps to a feed. Input path and frequency are picked from feed definition. + Defines input for the workflow. Each input maps to a feed. Input path and + frequency are picked from feed definition. The input specifies the - start and end instance for the workflow. Falcon creates a property with input name which contains paths of all input + start and end instance for the workflow. Falcon creates a property with input name which + contains paths of all input instances between start and end. This property will be available for the workflow to read inputs. Input can also optionally specify the specific partition of feed that the workflow needs. @@ -255,11 +269,11 @@ - - - - - + + + + + @@ -267,7 +281,8 @@ - Each output maps to a feed. The Output path and frequency are picked from the corresponding feed definition. + Each output maps to a feed. The Output path and frequency are picked from the + corresponding feed definition. The output also specifies the instance that is created in terms of EL expression. For each output, Falcon creates a property with the output name which can be used in workflows @@ -278,40 +293,40 @@ - - - + + + - - - + + + - + - - + + - + - + - - - + + + @@ -325,22 +340,22 @@ - - + + - - + + - + - - + + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/client/src/test/java/org/apache/falcon/cli/TestCLIParser.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/falcon/cli/TestCLIParser.java b/client/src/test/java/org/apache/falcon/cli/TestCLIParser.java index 943acba..a52da32 100644 --- a/client/src/test/java/org/apache/falcon/cli/TestCLIParser.java +++ b/client/src/test/java/org/apache/falcon/cli/TestCLIParser.java @@ -26,41 +26,41 @@ import org.testng.annotations.Test; public class TestCLIParser { - @Test - public void testEmptyParser() throws Exception { - try { - CLIParser parser = new CLIParser("falcon", new String[] {}); - CLIParser.Command c = parser.parse(new String[] { "a" }); - Assert.fail(); - } catch (ParseException ex) { - // nop - } - } + @Test + public void testEmptyParser() throws Exception { + try { + CLIParser parser = new CLIParser("falcon", new String[]{}); + CLIParser.Command c = parser.parse(new String[]{"a"}); + Assert.fail(); + } catch (ParseException ex) { + // nop + } + } - @Test - public void testCommandParser() throws Exception { - try { - CLIParser parser = new CLIParser("oozie", new String[] {}); - parser.addCommand("a", "", "AAAAA", new Options(), false); - CLIParser.Command c = parser.parse(new String[] { "a", "b" }); - Assert.assertEquals("a", c.getName()); - Assert.assertEquals("b", c.getCommandLine().getArgs()[0]); - } catch (ParseException ex) { - Assert.fail(); - } - } + @Test + public void testCommandParser() throws Exception { + try { + CLIParser parser = new CLIParser("oozie", new String[]{}); + parser.addCommand("a", "", "AAAAA", new Options(), false); + CLIParser.Command c = parser.parse(new String[]{"a", "b"}); + Assert.assertEquals("a", c.getName()); + Assert.assertEquals("b", c.getCommandLine().getArgs()[0]); + } catch (ParseException ex) { + Assert.fail(); + } + } - @Test - public void testCommandParserX() throws Exception { - Option opt = new Option("o", false, "O"); - Options opts = new Options(); - opts.addOption(opt); - CLIParser parser = new CLIParser("test", new String[] {}); - parser.addCommand("c", "-X ", - "(everything after '-X' are pass-through parameters)", opts, - true); - CLIParser.Command c = parser.parse("c -o -X -o c".split(" ")); - Assert.assertEquals("-X", c.getCommandLine().getArgList().get(0)); - Assert.assertEquals(3, c.getCommandLine().getArgList().size()); - } + @Test + public void testCommandParserX() throws Exception { + Option opt = new Option("o", false, "O"); + Options opts = new Options(); + opts.addOption(opt); + CLIParser parser = new CLIParser("test", new String[]{}); + parser.addCommand("c", "-X ", + "(everything after '-X' are pass-through parameters)", opts, + true); + CLIParser.Command c = parser.parse("c -o -X -o c".split(" ")); + Assert.assertEquals("-X", c.getCommandLine().getArgList().get(0)); + Assert.assertEquals(3, c.getCommandLine().getArgList().size()); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/client/src/test/java/org/apache/falcon/entity/v0/DateValidatorTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/falcon/entity/v0/DateValidatorTest.java b/client/src/test/java/org/apache/falcon/entity/v0/DateValidatorTest.java index 4a34127..fc81896 100644 --- a/client/src/test/java/org/apache/falcon/entity/v0/DateValidatorTest.java +++ b/client/src/test/java/org/apache/falcon/entity/v0/DateValidatorTest.java @@ -17,66 +17,66 @@ */ package org.apache.falcon.entity.v0; -import org.apache.falcon.entity.v0.DateValidator; + import org.testng.Assert; -import org.testng.annotations.*; - +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + /** * Date format yyyy/mm/dd validator Testing - * */ public class DateValidatorTest { - - @DataProvider - public Object[][] ValidDateProvider() { - return new Object[][]{ - new Object[] {"2011-11-01T00:00Z"}, new Object[] {"2020-01-01T00:00Z"}, - new Object[] {"2010-01-31T00:59Z"}, new Object[] {"2020-01-31T00:00Z"}, - new Object[] {"2008-02-29T01:00Z"}, new Object[] {"2008-02-29T00:00Z"}, - new Object[] {"2009-02-28T01:01Z"}, new Object[] {"2009-02-28T00:00Z"}, - new Object[] {"2010-03-31T23:00Z"}, new Object[] {"2010-03-31T00:00Z"}, - new Object[] {"2010-04-30T23:59Z"}, new Object[] {"2010-04-30T00:00Z"}, - new Object[] {"2010-05-31T23:23Z"}, new Object[] {"2010-05-31T00:00Z"}, - new Object[] {"2010-06-30T00:00Z"}, new Object[] {"2010-06-30T00:00Z"}, - new Object[] {"2010-07-31T00:00Z"}, new Object[] {"2010-07-31T00:00Z"}, - new Object[] {"2010-08-31T00:00Z"}, new Object[] {"2010-08-31T00:00Z"}, - new Object[] {"2010-09-30T00:00Z"}, new Object[] {"2010-09-30T00:00Z"}, - new Object[] {"2010-10-31T00:00Z"}, new Object[] {"2010-10-31T00:00Z"}, - new Object[] {"2010-11-30T00:00Z"}, new Object[] {"2010-11-30T00:00Z"}, - new Object[] {"2010-12-31T00:00Z"}, new Object[] {"2010-12-31T00:00Z"}, - new Object[] {"1999-01-30T01:00Z"}, new Object[] {"2999-12-31T00:00Z"} - }; - } - - @DataProvider - public Object[][] InvalidDateProvider() { - return new Object[][]{ - new Object[] {"2010-12-31T00:60Z"},new Object[] {"2010-12-31T24:00Z"}, - new Object[] {"2010-01-32T00:00Z"}, new Object[] {"2020-01-32T00:00Z"}, - new Object[] {"2010-13-1T00:00Z"}, new Object[] {"1820-01-01T00:00Z"}, - new Object[] {"2007-2-29T00:00Z"}, new Object[] {"2007-02-29T00:00Z"}, - new Object[] {"2008-2-30T00:00Z"}, new Object[] {"2008-02-31T00:00Z"}, - new Object[] {"2008-a-29T00:00Z"}, new Object[] {"2008-02aT00:00Z"}, - new Object[] {"2008-2-333T00:00Z"}, new Object[] {"200a-02-29T00:00Z"}, - new Object[] {"2010-4-31T00:00Z"}, new Object[] {"2010-04-31T00:00Z"}, - new Object[] {"2010-6-31T00:00Z"}, new Object[] {"2010-06-31T00:00Z"}, - new Object[] {"2010-9-31T00:00Z"}, new Object[] {"2010-09-31T00:00Z"}, - new Object[] {"2010-11-31T00:00Z"},new Object[] {"1999-04-31T01:00Z"}, - }; - } - - @Test(dataProvider = "ValidDateProvider") - public void ValidDateTest(String date) { - boolean valid = DateValidator.validate(date); - System.out.println("Date is valid : " + date + " , " + valid); - Assert.assertEquals(valid, true); - } - - @Test(dataProvider = "InvalidDateProvider", - dependsOnMethods="ValidDateTest") - public void InValidDateTest(String date) { - boolean valid = DateValidator.validate(date); - System.out.println("Date is valid : " + date + " , " + valid); - Assert.assertEquals(valid, false); - } + + @DataProvider + public Object[][] ValidDateProvider() { + return new Object[][]{ + new Object[]{"2011-11-01T00:00Z"}, new Object[]{"2020-01-01T00:00Z"}, + new Object[]{"2010-01-31T00:59Z"}, new Object[]{"2020-01-31T00:00Z"}, + new Object[]{"2008-02-29T01:00Z"}, new Object[]{"2008-02-29T00:00Z"}, + new Object[]{"2009-02-28T01:01Z"}, new Object[]{"2009-02-28T00:00Z"}, + new Object[]{"2010-03-31T23:00Z"}, new Object[]{"2010-03-31T00:00Z"}, + new Object[]{"2010-04-30T23:59Z"}, new Object[]{"2010-04-30T00:00Z"}, + new Object[]{"2010-05-31T23:23Z"}, new Object[]{"2010-05-31T00:00Z"}, + new Object[]{"2010-06-30T00:00Z"}, new Object[]{"2010-06-30T00:00Z"}, + new Object[]{"2010-07-31T00:00Z"}, new Object[]{"2010-07-31T00:00Z"}, + new Object[]{"2010-08-31T00:00Z"}, new Object[]{"2010-08-31T00:00Z"}, + new Object[]{"2010-09-30T00:00Z"}, new Object[]{"2010-09-30T00:00Z"}, + new Object[]{"2010-10-31T00:00Z"}, new Object[]{"2010-10-31T00:00Z"}, + new Object[]{"2010-11-30T00:00Z"}, new Object[]{"2010-11-30T00:00Z"}, + new Object[]{"2010-12-31T00:00Z"}, new Object[]{"2010-12-31T00:00Z"}, + new Object[]{"1999-01-30T01:00Z"}, new Object[]{"2999-12-31T00:00Z"} + }; + } + + @DataProvider + public Object[][] InvalidDateProvider() { + return new Object[][]{ + new Object[]{"2010-12-31T00:60Z"}, new Object[]{"2010-12-31T24:00Z"}, + new Object[]{"2010-01-32T00:00Z"}, new Object[]{"2020-01-32T00:00Z"}, + new Object[]{"2010-13-1T00:00Z"}, new Object[]{"1820-01-01T00:00Z"}, + new Object[]{"2007-2-29T00:00Z"}, new Object[]{"2007-02-29T00:00Z"}, + new Object[]{"2008-2-30T00:00Z"}, new Object[]{"2008-02-31T00:00Z"}, + new Object[]{"2008-a-29T00:00Z"}, new Object[]{"2008-02aT00:00Z"}, + new Object[]{"2008-2-333T00:00Z"}, new Object[]{"200a-02-29T00:00Z"}, + new Object[]{"2010-4-31T00:00Z"}, new Object[]{"2010-04-31T00:00Z"}, + new Object[]{"2010-6-31T00:00Z"}, new Object[]{"2010-06-31T00:00Z"}, + new Object[]{"2010-9-31T00:00Z"}, new Object[]{"2010-09-31T00:00Z"}, + new Object[]{"2010-11-31T00:00Z"}, new Object[]{"1999-04-31T01:00Z"}, + }; + } + + @Test(dataProvider = "ValidDateProvider") + public void ValidDateTest(String date) { + boolean valid = DateValidator.validate(date); + System.out.println("Date is valid : " + date + " , " + valid); + Assert.assertEquals(valid, true); + } + + @Test(dataProvider = "InvalidDateProvider", + dependsOnMethods = "ValidDateTest") + public void InValidDateTest(String date) { + boolean valid = DateValidator.validate(date); + System.out.println("Date is valid : " + date + " , " + valid); + Assert.assertEquals(valid, false); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/FalconException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/FalconException.java b/common/src/main/java/org/apache/falcon/FalconException.java index 1e8b69b..391700e 100644 --- a/common/src/main/java/org/apache/falcon/FalconException.java +++ b/common/src/main/java/org/apache/falcon/FalconException.java @@ -19,34 +19,30 @@ package org.apache.falcon; /** * Common Exception thrown from Parsers. To the Falcon API class - * */ public class FalconException extends Exception { - /** - * - * @param e Exception - */ - public FalconException(Throwable e) { - super(e); - } + /** + * @param e Exception + */ + public FalconException(Throwable e) { + super(e); + } - public FalconException(String message, Throwable e) { - super(message, e); - } + public FalconException(String message, Throwable e) { + super(message, e); + } - /** - * - * @param message - * - custom exception message - */ - public FalconException(String message) { - super(message); - } + /** + * @param message - custom exception message + */ + public FalconException(String message) { + super(message); + } - /** - * - */ - private static final long serialVersionUID = -1475818869309247014L; + /** + * + */ + private static final long serialVersionUID = -1475818869309247014L; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/FalconRuntimException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/FalconRuntimException.java b/common/src/main/java/org/apache/falcon/FalconRuntimException.java index d25043a..b603fbf 100644 --- a/common/src/main/java/org/apache/falcon/FalconRuntimException.java +++ b/common/src/main/java/org/apache/falcon/FalconRuntimException.java @@ -19,26 +19,25 @@ package org.apache.falcon; /** - *Runtime Execption class for Falcon application - * + * Runtime Execption class for Falcon application */ -public class FalconRuntimException extends RuntimeException{ +public class FalconRuntimException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1105135528999858955L; - /** - * - */ - private static final long serialVersionUID = 1105135528999858955L; + public FalconRuntimException(Exception e) { + super(e); + } - public FalconRuntimException(Exception e) { - super(e); - } + public FalconRuntimException(String message) { + super(message); + } - public FalconRuntimException(String message) { - super(message); - } - - public FalconRuntimException(String message, Throwable e) { - super(message); - } + public FalconRuntimException(String message, Throwable e) { + super(message); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java index 8fbe951..264ea28 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java @@ -17,16 +17,7 @@ */ package org.apache.falcon.cleanup; -import java.io.IOException; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; - import org.apache.commons.el.ExpressionEvaluatorImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.store.ConfigurationStore; @@ -37,110 +28,118 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import javax.servlet.jsp.el.ELException; +import javax.servlet.jsp.el.ExpressionEvaluator; +import java.io.IOException; + public abstract class AbstractCleanupHandler { - protected static final Logger LOG = Logger - .getLogger(AbstractCleanupHandler.class); - protected final ConfigurationStore STORE = ConfigurationStore.get(); - public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); - public static final ExpressionHelper resolver = ExpressionHelper.get(); - - protected long getRetention(Entity entity, TimeUnit timeUnit) - throws FalconException { - String retention = getRetentionValue(timeUnit); - try { - return (Long) EVALUATOR.evaluate("${" + retention + "}", - Long.class, resolver, resolver); - } catch (ELException e) { - throw new FalconException("Unable to evalue retention limit: " - + retention + " for entity: " + entity.getName()); - } - } - - private String getRetentionValue(Frequency.TimeUnit timeunit) { - return RuntimeProperties.get().getProperty( - "log.cleanup.frequency." + timeunit + ".retention", "days(1)"); - - } - - protected FileStatus[] getAllLogs( - org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) - throws FalconException { - String stagingPath = ClusterHelper.getLocation(cluster, "staging"); - Path logPath = getLogPath(entity, stagingPath); - FileSystem fs = getFileSystem(cluster); - FileStatus[] paths; - try { - paths = fs.globStatus(logPath); - } catch (IOException e) { - throw new FalconException(e); - } - return paths; - } - - private FileSystem getFileSystem( - org.apache.falcon.entity.v0.cluster.Cluster cluster) - throws FalconException { - - FileSystem fs; - try { - fs = new Path(ClusterHelper.getStorageUrl(cluster)) - .getFileSystem(new Configuration()); - } catch (IOException e) { - throw new FalconException(e); - } - return fs; - } - - protected void delete(Cluster cluster, Entity entity, long retention) - throws FalconException { - - FileStatus[] logs = getAllLogs(cluster, entity); - long now = System.currentTimeMillis(); - - for (FileStatus log : logs) { - if (now - log.getModificationTime() > retention) { - try { - boolean isDeleted = getFileSystem(cluster).delete( - log.getPath(), true); - if (isDeleted == false) { - LOG.error("Unable to delete path: " + log.getPath()); - } else { - LOG.info("Deleted path: " + log.getPath()); - } - deleteParentIfEmpty(getFileSystem(cluster),log.getPath().getParent()); - } catch (IOException e) { - throw new FalconException(" Unable to delete log file : " - + log.getPath() + " for entity " + entity.getName() - + " for cluster: " + cluster.getName(), e); - } - } else { - LOG.info("Retention limit: " + retention - + " is less than modification" - + (now - log.getModificationTime()) + " for path: " - + log.getPath()); - } - } - - } - - private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException { - FileStatus[] files = fs.listStatus(parent); - if(files!=null && files.length==0){ - LOG.info("Parent path: "+parent+ " is empty, deleting path"); - fs.delete(parent, true); - deleteParentIfEmpty(fs,parent.getParent()); - } - - } - - public abstract void cleanup() throws FalconException; - - protected abstract Path getLogPath(Entity entity, String stagingPath); - - protected String getCurrentColo(){ - return StartupProperties.get().getProperty("current.colo", "default"); - } + protected static final Logger LOG = Logger + .getLogger(AbstractCleanupHandler.class); + protected final ConfigurationStore STORE = ConfigurationStore.get(); + public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); + public static final ExpressionHelper resolver = ExpressionHelper.get(); + + protected long getRetention(Entity entity, TimeUnit timeUnit) + throws FalconException { + String retention = getRetentionValue(timeUnit); + try { + return (Long) EVALUATOR.evaluate("${" + retention + "}", + Long.class, resolver, resolver); + } catch (ELException e) { + throw new FalconException("Unable to evalue retention limit: " + + retention + " for entity: " + entity.getName()); + } + } + + private String getRetentionValue(Frequency.TimeUnit timeunit) { + return RuntimeProperties.get().getProperty( + "log.cleanup.frequency." + timeunit + ".retention", "days(1)"); + + } + + protected FileStatus[] getAllLogs( + org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) + throws FalconException { + String stagingPath = ClusterHelper.getLocation(cluster, "staging"); + Path logPath = getLogPath(entity, stagingPath); + FileSystem fs = getFileSystem(cluster); + FileStatus[] paths; + try { + paths = fs.globStatus(logPath); + } catch (IOException e) { + throw new FalconException(e); + } + return paths; + } + + private FileSystem getFileSystem( + org.apache.falcon.entity.v0.cluster.Cluster cluster) + throws FalconException { + + FileSystem fs; + try { + fs = new Path(ClusterHelper.getStorageUrl(cluster)) + .getFileSystem(new Configuration()); + } catch (IOException e) { + throw new FalconException(e); + } + return fs; + } + + protected void delete(Cluster cluster, Entity entity, long retention) + throws FalconException { + + FileStatus[] logs = getAllLogs(cluster, entity); + long now = System.currentTimeMillis(); + + for (FileStatus log : logs) { + if (now - log.getModificationTime() > retention) { + try { + boolean isDeleted = getFileSystem(cluster).delete( + log.getPath(), true); + if (isDeleted == false) { + LOG.error("Unable to delete path: " + log.getPath()); + } else { + LOG.info("Deleted path: " + log.getPath()); + } + deleteParentIfEmpty(getFileSystem(cluster), log.getPath().getParent()); + } catch (IOException e) { + throw new FalconException(" Unable to delete log file : " + + log.getPath() + " for entity " + entity.getName() + + " for cluster: " + cluster.getName(), e); + } + } else { + LOG.info("Retention limit: " + retention + + " is less than modification" + + (now - log.getModificationTime()) + " for path: " + + log.getPath()); + } + } + + } + + private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException { + FileStatus[] files = fs.listStatus(parent); + if (files != null && files.length == 0) { + LOG.info("Parent path: " + parent + " is empty, deleting path"); + fs.delete(parent, true); + deleteParentIfEmpty(fs, parent.getParent()); + } + + } + + public abstract void cleanup() throws FalconException; + + protected abstract Path getLogPath(Entity entity, String stagingPath); + + protected String getCurrentColo() { + return StartupProperties.get().getProperty("current.colo", "default"); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java index 6b7f34e..37fc653 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java @@ -17,48 +17,47 @@ */ package org.apache.falcon.cleanup; -import java.util.Collection; - -import org.apache.hadoop.fs.Path; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.util.DeploymentUtil; +import org.apache.hadoop.fs.Path; + +import java.util.Collection; public class FeedCleanupHandler extends AbstractCleanupHandler { - @Override - public void cleanup() throws FalconException { - Collection feeds = STORE.getEntities(EntityType.FEED); - for (String feedName : feeds) { - Feed feed; - feed = STORE.get(EntityType.FEED, feedName); - long retention = getRetention(feed, feed.getFrequency() - .getTimeUnit()); - for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed - .getClusters().getClusters()) { - Cluster currentCluster = STORE.get(EntityType.CLUSTER, - cluster.getName()); - if(currentCluster.getColo().equals(getCurrentColo())){ - LOG.info("Cleaning up logs for process:" + feedName - + " in cluster: " + cluster.getName() + " with retention: "+retention); - delete(currentCluster, feed, retention); - }else{ - LOG.info("Ignoring cleanup for process:" + feedName - + " in cluster: " + cluster.getName()+ " as this does not belong to current colo" ); - } - } + @Override + public void cleanup() throws FalconException { + Collection feeds = STORE.getEntities(EntityType.FEED); + for (String feedName : feeds) { + Feed feed; + feed = STORE.get(EntityType.FEED, feedName); + long retention = getRetention(feed, feed.getFrequency() + .getTimeUnit()); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed + .getClusters().getClusters()) { + Cluster currentCluster = STORE.get(EntityType.CLUSTER, + cluster.getName()); + if (currentCluster.getColo().equals(getCurrentColo())) { + LOG.info("Cleaning up logs for process:" + feedName + + " in cluster: " + cluster.getName() + " with retention: " + retention); + delete(currentCluster, feed, retention); + } else { + LOG.info("Ignoring cleanup for process:" + feedName + + " in cluster: " + cluster.getName() + " as this does not belong to current colo"); + } + } - } - } + } + } - @Override - protected Path getLogPath(Entity entity, String stagingPath) { - Path logPath = new Path(stagingPath, "falcon/workflows/feed/" - + entity.getName() + "/logs/job-*/*/*"); - return logPath; - } + @Override + protected Path getLogPath(Entity entity, String stagingPath) { + Path logPath = new Path(stagingPath, "falcon/workflows/feed/" + + entity.getName() + "/logs/job-*/*/*"); + return logPath; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java index a7f0852..9b96879 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java @@ -17,48 +17,47 @@ */ package org.apache.falcon.cleanup; -import java.util.Collection; - -import org.apache.hadoop.fs.Path; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.util.DeploymentUtil; +import org.apache.hadoop.fs.Path; + +import java.util.Collection; public class ProcessCleanupHandler extends AbstractCleanupHandler { - @Override - public void cleanup() throws FalconException { - Collection processes = STORE.getEntities(EntityType.PROCESS); - for (String processName : processes) { - Process process; - process = STORE.get(EntityType.PROCESS, processName); - long retention = getRetention(process, process.getFrequency() - .getTimeUnit()); - for (org.apache.falcon.entity.v0.process.Cluster cluster : process - .getClusters().getClusters()) { - Cluster currentCluster = STORE.get(EntityType.CLUSTER, - cluster.getName()); - if(currentCluster.getColo().equals(getCurrentColo())){ - LOG.info("Cleaning up logs for process:" + processName - + " in cluster: " + cluster.getName() + " with retention: "+retention); - delete(currentCluster, process, retention); - }else{ - LOG.info("Ignoring cleanup for process:" + processName - + " in cluster: " + cluster.getName()+ " as this does not belong to current colo" ); - } - } + @Override + public void cleanup() throws FalconException { + Collection processes = STORE.getEntities(EntityType.PROCESS); + for (String processName : processes) { + Process process; + process = STORE.get(EntityType.PROCESS, processName); + long retention = getRetention(process, process.getFrequency() + .getTimeUnit()); + for (org.apache.falcon.entity.v0.process.Cluster cluster : process + .getClusters().getClusters()) { + Cluster currentCluster = STORE.get(EntityType.CLUSTER, + cluster.getName()); + if (currentCluster.getColo().equals(getCurrentColo())) { + LOG.info("Cleaning up logs for process:" + processName + + " in cluster: " + cluster.getName() + " with retention: " + retention); + delete(currentCluster, process, retention); + } else { + LOG.info("Ignoring cleanup for process:" + processName + + " in cluster: " + cluster.getName() + " as this does not belong to current colo"); + } + } - } - } + } + } - @Override - protected Path getLogPath(Entity entity, String stagingPath) { - Path logPath = new Path(stagingPath, "falcon/workflows/process/" - + entity.getName() + "/logs/job-*/*"); - return logPath; - } + @Override + protected Path getLogPath(Entity entity, String stagingPath) { + Path logPath = new Path(stagingPath, "falcon/workflows/process/" + + entity.getName() + "/logs/job-*/*"); + return logPath; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index b40d7b1..3d77887 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -18,27 +18,25 @@ package org.apache.falcon.entity; +import org.apache.falcon.entity.v0.cluster.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.cluster.Location; -import org.apache.falcon.entity.v0.cluster.Property; public final class ClusterHelper { - public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; + public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; - private ClusterHelper() {} + private ClusterHelper() { + } public static Configuration getConfiguration(Cluster cluster) { Configuration conf = new Configuration(); conf.set("fs.default.name", getStorageUrl(cluster)); conf.set("mapred.job.tracker", getMREndPoint(cluster)); - if(cluster.getProperties() != null) - for(Property prop:cluster.getProperties().getProperties()) { + if (cluster.getProperties() != null) { + for (Property prop : cluster.getProperties().getProperties()) { conf.set(prop.getName(), prop.getValue()); } + } return conf; } @@ -57,51 +55,58 @@ public final class ClusterHelper { public static String getMREndPoint(Cluster cluster) { return getInterface(cluster, Interfacetype.EXECUTE).getEndpoint(); } - + public static String getMessageBrokerUrl(Cluster cluster) { return getInterface(cluster, Interfacetype.MESSAGING).getEndpoint(); } - + public static String getMessageBrokerImplClass(Cluster cluster) { - if(cluster.getProperties() != null) - for(Property prop:cluster.getProperties().getProperties()) - if(prop.getName().equals("brokerImplClass")) + if (cluster.getProperties() != null) { + for (Property prop : cluster.getProperties().getProperties()) { + if (prop.getName().equals("brokerImplClass")) { return prop.getValue(); + } + } + } return DEFAULT_BROKER_IMPL_CLASS; } public static Interface getInterface(Cluster cluster, Interfacetype type) { - for(Interface interf:cluster.getInterfaces().getInterfaces()) - if(interf.getType() == type) + for (Interface interf : cluster.getInterfaces().getInterfaces()) { + if (interf.getType() == type) { return interf; + } + } return null; } private static String getNormalizedUrl(Cluster cluster, Interfacetype type) { - String normalizedUrl = getInterface(cluster, type).getEndpoint(); - String normalizedPath = new Path(normalizedUrl + "/").toString(); - return normalizedPath.substring(0, normalizedPath.length() - 1); + String normalizedUrl = getInterface(cluster, type).getEndpoint(); + String normalizedPath = new Path(normalizedUrl + "/").toString(); + return normalizedPath.substring(0, normalizedPath.length() - 1); } public static String getCompleteLocation(Cluster cluster, String locationKey) { return getStorageUrl(cluster) + "/" + getLocation(cluster, locationKey); } - + public static String getLocation(Cluster cluster, String locationKey) { - for(Location loc:cluster.getLocations().getLocations()) { - if(loc.getName().equals(locationKey)) + for (Location loc : cluster.getLocations().getLocations()) { + if (loc.getName().equals(locationKey)) { return loc.getPath(); + } + } + return null; + } + + public static String getPropertyValue(Cluster cluster, String propName) { + if (cluster.getProperties() != null) { + for (Property prop : cluster.getProperties().getProperties()) { + if (prop.getName().equals(propName)) { + return prop.getValue(); + } + } } return null; } - - public static String getPropertyValue(Cluster cluster, String propName) { - if (cluster.getProperties() != null) { - for (Property prop : cluster.getProperties().getProperties()) { - if (prop.getName().equals(propName)) - return prop.getValue(); - } - } - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java index e620466..6c5d885 100644 --- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java +++ b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java @@ -18,18 +18,19 @@ package org.apache.falcon.entity; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.service.ConfigurationChangeListener; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + public class ColoClusterRelation implements ConfigurationChangeListener { - private static final ConcurrentHashMap> coloClusterMap = new ConcurrentHashMap>(); + private static final ConcurrentHashMap> coloClusterMap + = new ConcurrentHashMap>(); private static final ColoClusterRelation instance = new ColoClusterRelation(); private ColoClusterRelation() { @@ -40,15 +41,17 @@ public class ColoClusterRelation implements ConfigurationChangeListener { } public Set getClusters(String colo) { - if (coloClusterMap.containsKey(colo)) + if (coloClusterMap.containsKey(colo)) { return coloClusterMap.get(colo); + } return new HashSet(); } @Override public void onAdd(Entity entity) { - if (entity.getEntityType() != EntityType.CLUSTER) + if (entity.getEntityType() != EntityType.CLUSTER) { return; + } Cluster cluster = (Cluster) entity; coloClusterMap.putIfAbsent(cluster.getColo(), new HashSet()); @@ -57,19 +60,22 @@ public class ColoClusterRelation implements ConfigurationChangeListener { @Override public void onRemove(Entity entity) { - if (entity.getEntityType() != EntityType.CLUSTER) + if (entity.getEntityType() != EntityType.CLUSTER) { return; + } Cluster cluster = (Cluster) entity; coloClusterMap.get(cluster.getColo()).remove(cluster.getName()); - if (coloClusterMap.get(cluster.getColo()).isEmpty()) + if (coloClusterMap.get(cluster.getColo()).isEmpty()) { coloClusterMap.remove(cluster.getColo()); + } } @Override public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - if (oldEntity.getEntityType() != EntityType.CLUSTER) + if (oldEntity.getEntityType() != EntityType.CLUSTER) { return; + } throw new FalconException("change shouldn't be supported on cluster!"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java b/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java index 03b8d23..db922a4 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java @@ -20,8 +20,8 @@ package org.apache.falcon.entity; import org.apache.falcon.FalconException; -public class EntityNotRegisteredException extends FalconException{ - +public class EntityNotRegisteredException extends FalconException { + public EntityNotRegisteredException(String message) { super(message); }