falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [7/7] falcon git commit: Removing addons/ non-docs directory from asf-site branch
Date Tue, 01 Mar 2016 07:24:21 GMT
Removing addons/ non-docs directory from asf-site branch


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6f5b476c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6f5b476c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6f5b476c

Branch: refs/heads/asf-site
Commit: 6f5b476ccd8fa4ff1e9aea36d44a85309a9b932e
Parents: 8609ffd
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Mar 1 12:54:02 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Mar 1 12:54:02 2016 +0530

----------------------------------------------------------------------
 addons/adf/README                               |  59 --
 addons/adf/pom.xml                              | 112 ---
 .../apache/falcon/adfservice/ADFHiveJob.java    | 123 ----
 .../org/apache/falcon/adfservice/ADFJob.java    | 556 ---------------
 .../apache/falcon/adfservice/ADFJobFactory.java |  43 --
 .../org/apache/falcon/adfservice/ADFPigJob.java |  70 --
 .../falcon/adfservice/ADFProviderService.java   | 370 ----------
 .../falcon/adfservice/ADFReplicationJob.java    |  71 --
 .../falcon/adfservice/ADFScheduledExecutor.java |  71 --
 .../org/apache/falcon/adfservice/DataFeed.java  | 110 ---
 .../java/org/apache/falcon/adfservice/Feed.java |  39 -
 .../org/apache/falcon/adfservice/Process.java   | 148 ----
 .../org/apache/falcon/adfservice/TableFeed.java | 125 ----
 .../adfservice/util/ADFJsonConstants.java       |  73 --
 .../apache/falcon/adfservice/util/FSUtils.java  | 102 ---
 addons/designer/actions/pom.xml                 |  46 --
 .../configuration/EmailActionConfiguration.java |  74 --
 .../designer/primitive/action/EmailAction.java  |  92 ---
 addons/designer/checkstyle/pom.xml              |  28 -
 .../resources/falcon/checkstyle-java-header.txt |  17 -
 .../resources/falcon/checkstyle-noframes.xsl    | 218 ------
 .../src/main/resources/falcon/checkstyle.xml    | 233 ------
 .../main/resources/falcon/findbugs-exclude.xml  |  34 -
 addons/designer/common/pom.xml                  |  42 --
 addons/designer/core/pom.xml                    |  81 ---
 .../configuration/ActionConfiguration.java      |  32 -
 .../designer/configuration/Configuration.java   |  81 ---
 .../designer/configuration/FlowConfig.java      |  69 --
 .../designer/configuration/SerdeException.java  |  61 --
 .../configuration/TransformConfiguration.java   |  33 -
 .../falcon/designer/primitive/Action.java       | 102 ---
 .../apache/falcon/designer/primitive/Code.java  |  27 -
 .../primitive/CompilationException.java         |  60 --
 .../falcon/designer/primitive/Message.java      |  67 --
 .../falcon/designer/primitive/Primitive.java    | 159 -----
 .../falcon/designer/primitive/Transform.java    |  88 ---
 .../falcon/designer/schema/RelationalData.java  |  53 --
 .../designer/schema/RelationalSchema.java       |  84 ---
 .../falcon/designer/source/DataSource.java      |  29 -
 .../apache/falcon/designer/storage/Storage.java |  67 --
 .../designer/storage/StorageException.java      |  63 --
 .../falcon/designer/storage/Storeable.java      |  52 --
 .../apache/falcon/designer/storage/Version.java |  71 --
 .../designer/storage/VersionedStorage.java      | 111 ---
 .../designer/storage/impl/HDFSStorage.java      |  98 ---
 .../designer/storage/impl/HDFSStorageTest.java  |  78 --
 addons/designer/flows/pom.xml                   |  46 --
 .../apache/falcon/designer/primitive/Flow.java  |  83 ---
 addons/designer/pom.xml                         | 709 -------------------
 addons/designer/transforms/pom.xml              |  42 --
 addons/designer/ui/pom.xml                      |  95 ---
 .../designer/ui/src/main/webapp/WEB-INF/web.xml |  49 --
 addons/hivedr/README                            |  80 ---
 addons/hivedr/pom.xml                           | 209 ------
 .../apache/falcon/hive/DefaultPartitioner.java  | 317 ---------
 .../org/apache/falcon/hive/EventSourcer.java    |  31 -
 .../java/org/apache/falcon/hive/HiveDRArgs.java | 122 ----
 .../org/apache/falcon/hive/HiveDROptions.java   | 183 -----
 .../java/org/apache/falcon/hive/HiveDRTool.java | 393 ----------
 .../falcon/hive/LastReplicatedEvents.java       | 196 -----
 .../falcon/hive/MetaStoreEventSourcer.java      | 204 ------
 .../org/apache/falcon/hive/Partitioner.java     |  42 --
 .../falcon/hive/ReplicationEventMetadata.java   |  34 -
 .../exception/HiveReplicationException.java     |  49 --
 .../falcon/hive/mapreduce/CopyCommitter.java    |  65 --
 .../falcon/hive/mapreduce/CopyMapper.java       | 104 ---
 .../falcon/hive/mapreduce/CopyReducer.java      |  85 ---
 .../falcon/hive/util/DBReplicationStatus.java   | 213 ------
 .../apache/falcon/hive/util/DRStatusStore.java  | 104 ---
 .../apache/falcon/hive/util/DelimiterUtils.java |  30 -
 .../falcon/hive/util/EventSourcerUtils.java     | 189 -----
 .../org/apache/falcon/hive/util/EventUtils.java | 393 ----------
 .../org/apache/falcon/hive/util/FileUtils.java  |  68 --
 .../falcon/hive/util/HiveDRStatusStore.java     | 315 --------
 .../apache/falcon/hive/util/HiveDRUtils.java    |  99 ---
 .../falcon/hive/util/HiveMetastoreUtils.java    |  92 ---
 .../falcon/hive/util/ReplicationStatus.java     | 221 ------
 addons/hivedr/src/main/resources/log4j.xml      |  54 --
 .../falcon/hive/DBReplicationStatusTest.java    | 230 ------
 .../java/org/apache/falcon/hive/DRTest.java     |  45 --
 .../falcon/hive/HiveDRStatusStoreTest.java      | 343 ---------
 .../java/org/apache/falcon/hive/HiveDRTest.java | 252 -------
 .../falcon/hive/ReplicationStatusTest.java      | 137 ----
 addons/recipes/hdfs-replication/README.txt      |  29 -
 addons/recipes/hdfs-replication/pom.xml         |  32 -
 .../resources/hdfs-replication-template.xml     |  44 --
 .../resources/hdfs-replication-workflow.xml     |  82 ---
 .../main/resources/hdfs-replication.properties  |  79 ---
 .../recipes/hive-disaster-recovery/README.txt   |  58 --
 addons/recipes/hive-disaster-recovery/pom.xml   |  32 -
 .../hive-disaster-recovery-secure-template.xml  |  45 --
 .../hive-disaster-recovery-secure-workflow.xml  | 357 ----------
 .../hive-disaster-recovery-secure.properties    | 108 ---
 .../hive-disaster-recovery-template.xml         |  45 --
 .../hive-disaster-recovery-workflow.xml         | 249 -------
 .../resources/hive-disaster-recovery.properties |  98 ---
 96 files changed, 11603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/README
----------------------------------------------------------------------
diff --git a/addons/adf/README b/addons/adf/README
deleted file mode 100644
index 39883b8..0000000
--- a/addons/adf/README
+++ /dev/null
@@ -1,59 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-ADF Provider
-=======================
-
-
-Overview
----------
-
-This integration allows Microsoft Azure Data Factory pipelines to invoke Falcon activities
-(i.e. replication, hive and pig proessing work), so the user can build a hybrid Hadoop data pipelines
-leveraging on-premises Hadoop clusters and cloud based Cortana Analytics services
-like HDInsight Hadoop clusters and Azure Machine Learning.
-
-
-Usage
----------
-
-Falcon reads Azure Service Bus credentials from conf/startup.properties when it starts.
-So, the credential needs to be added before starting Falcon,
-and Falcon needs to be restarted if there is any change in the credential.
-
-Example:
-
-######### ADF Configurations start #########
-
-# A String object that represents the namespace
-*.microsoft.windowsazure.services.servicebus.namespace=hwtransport
-
-# Request and status queues on the namespace
-*.microsoft.windowsazure.services.servicebus.requestqueuename=adfrequest
-*.microsoft.windowsazure.services.servicebus.statusqueuename=adfstatus
-
-# A String object that contains the SAS key name
-*.microsoft.windowsazure.services.servicebus.sasKeyName=RootManageSharedAccessKey
-
-# A String object that contains the SAS key
-*.microsoft.windowsazure.services.servicebus.sasKey=4kt2x6yEoWZZSFZofyXEoxly0knHL7FPMqLD14ov1jo=
-
-# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
-# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
-*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=.servicebus.windows.net
-
-# Service bus polling frequency (in seconds)
-*.microsoft.windowsazure.services.servicebus.polling.frequency=60

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/pom.xml
----------------------------------------------------------------------
diff --git a/addons/adf/pom.xml b/addons/adf/pom.xml
deleted file mode 100644
index 898791e..0000000
--- a/addons/adf/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.falcon</groupId>
-        <artifactId>falcon-main</artifactId>
-        <version>0.10-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <artifactId>falcon-adf</artifactId>
-    <description>Apache Falcon ADF Integration</description>
-    <name>Apache Falcon ADF Integration</name>
-    <packaging>jar</packaging>
-
-    <properties>
-        <azure.version>0.8.0</azure.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-common</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-prism</artifactId>
-            <version>${project.version}</version>
-            <classifier>classes</classifier>
-        </dependency>
-
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-servicebus</artifactId>
-            <version>${azure.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-core-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-mapper-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
-
-    <profiles>
-        <profile>
-            <id>hadoop-2</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
-
-    <build>
-        <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
-        <!--<testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>-->
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
deleted file mode 100644
index 6412c73..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.adfservice.util.ADFJsonConstants;
-import org.apache.falcon.FalconException;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Azure ADF Hive Job.
- */
-public class ADFHiveJob extends ADFJob {
-    private static final String HIVE_SCRIPT_EXTENSION = ".hql";
-    private static final String ENGINE_TYPE = "hive";
-    private static final String INPUT_FEED_SUFFIX = "-hive-input-feed";
-    private static final String OUTPUT_FEED_SUFFIX = "-hive-output-feed";
-
-    private String hiveScriptPath;
-    private TableFeed inputFeed;
-    private TableFeed outputFeed;
-
-    public ADFHiveJob(String message, String id) throws FalconException {
-        super(message, id);
-        type = JobType.HIVE;
-        inputFeed = getInputTableFeed();
-        outputFeed = getOutputTableFeed();
-        hiveScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(HIVE_SCRIPT_EXTENSION);
-    }
-
-    @Override
-    public void startJob() throws FalconException {
-        startProcess(inputFeed, outputFeed, ENGINE_TYPE, hiveScriptPath);
-    }
-
-    @Override
-    public void cleanup() throws FalconException {
-        cleanupProcess(inputFeed, outputFeed);
-    }
-
-    private TableFeed getInputTableFeed() throws FalconException {
-        return getTableFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0),
-                getTableCluster(getInputTables().get(0)));
-    }
-
-    private TableFeed getOutputTableFeed() throws FalconException {
-        return getTableFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0),
-                getTableCluster(getOutputTables().get(0)));
-    }
-
-    private TableFeed getTableFeed(final String feedName, final String tableName,
-                                   final String clusterName) throws FalconException {
-        JSONObject tableExtendedProperties = getTableExtendedProperties(tableName);
-        String tableFeedName;
-        String partitions;
-
-        try {
-            tableFeedName = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_NAME);
-            if (StringUtils.isBlank(tableFeedName)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_NAME + " cannot"
-                        + " be empty in ADF request.");
-            }
-            partitions = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION);
-            if (StringUtils.isBlank(partitions)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION + " cannot"
-                        + " be empty in ADF request.");
-            }
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing ADF JSON message: " + tableExtendedProperties, e);
-        }
-
-        return new TableFeed.Builder().withFeedName(feedName).withFrequency(frequency)
-                .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime).
-                        withAclOwner(proxyUser).withTableName(tableFeedName).withPartitions(partitions).build();
-    }
-
-    private JSONObject getTableExtendedProperties(final String tableName) throws FalconException {
-        JSONObject table = tablesMap.get(tableName);
-        if (table == null) {
-            throw new FalconException("JSON object table " + tableName + " not found in ADF request.");
-        }
-
-        try {
-            JSONObject tableProperties = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES);
-            if (tableProperties == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_PROPERTIES
-                        + " not found in ADF request.");
-            }
-            JSONObject tablesLocation = tableProperties.getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION);
-            if (tablesLocation == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LOCATION
-                        + " not found in ADF request.");
-            }
-
-            JSONObject tableExtendedProperties = tablesLocation.getJSONObject(ADFJsonConstants.
-                    ADF_REQUEST_EXTENDED_PROPERTIES);
-            if (tableExtendedProperties == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES
-                        + " not found in ADF request.");
-            }
-            return tableExtendedProperties;
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing ADF JSON message: " + table, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
deleted file mode 100644
index 5d81338..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.falcon.adfservice.util.ADFJsonConstants;
-import org.apache.falcon.adfservice.util.FSUtils;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.resource.AbstractSchedulableEntityManager;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.Path;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for Azure ADF jobs.
- */
-public abstract class ADFJob {
-    private static final Logger LOG = LoggerFactory.getLogger(ADFJob.class);
-
-    // name prefix for all adf related entity, e.g. an adf hive process and the feeds associated with it
-    public static final String ADF_ENTITY_NAME_PREFIX = "ADF-";
-    public static final int ADF_ENTITY_NAME_PREFIX_LENGTH = ADF_ENTITY_NAME_PREFIX.length();
-    // name prefix for all adf related job entity, i.e. adf hive/pig process and replication feed
-    public static final String ADF_JOB_ENTITY_NAME_PREFIX = ADF_ENTITY_NAME_PREFIX + "JOB-";
-    public static final int ADF_JOB_ENTITY_NAME_PREFIX_LENGTH = ADF_JOB_ENTITY_NAME_PREFIX.length();
-
-    public static final String TEMPLATE_PATH_PREFIX = "/apps/falcon/adf/";
-    public static final String PROCESS_SCRIPTS_PATH = TEMPLATE_PATH_PREFIX
-            + Path.SEPARATOR + "generatedscripts";
-    private static final String DEFAULT_FREQUENCY = "days(1)";
-
-    public static boolean isADFJobEntity(String entityName) {
-        return entityName.startsWith(ADF_JOB_ENTITY_NAME_PREFIX);
-    }
-
-    public static String getSessionID(String entityName) throws FalconException {
-        if (!isADFJobEntity(entityName)) {
-            throw new FalconException("The entity, " + entityName + ", is not an ADF Job Entity.");
-        }
-        return entityName.substring(ADF_JOB_ENTITY_NAME_PREFIX_LENGTH);
-    }
-
-    /**
-     * Enum for job type.
-     */
-    public static enum JobType {
-        HIVE, PIG, REPLICATION
-    }
-
-    private static enum RequestType {
-        HADOOPMIRROR, HADOOPHIVE, HADOOPPIG
-    }
-
-    public static JobType getJobType(String msg) throws FalconException {
-        try {
-            JSONObject obj = new JSONObject(msg);
-            JSONObject activity = obj.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
-            if (activity == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
-                        + " request.");
-            }
-
-            JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
-            if (activityProperties == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found "
-                        + "in ADF request.");
-            }
-
-            String type = activityProperties.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
-            if (StringUtils.isBlank(type)) {
-                throw new FalconException(ADFJsonConstants.ADF_REQUEST_TYPE + " not found in ADF request msg");
-            }
-
-            switch (RequestType.valueOf(type.toUpperCase())) {
-            case HADOOPMIRROR:
-                return JobType.REPLICATION;
-            case HADOOPHIVE:
-                return JobType.HIVE;
-            case HADOOPPIG:
-                return JobType.PIG;
-            default:
-                throw new FalconException("Unrecognized ADF job type: " + type);
-            }
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
-        }
-    }
-
-    public abstract void startJob() throws FalconException;
-    public abstract void cleanup() throws FalconException;
-
-    protected JSONObject message;
-    protected JSONObject activity;
-    protected JSONObject activityExtendedProperties;
-    protected String id;
-    protected JobType type;
-    protected String startTime, endTime;
-    protected String frequency;
-    protected String proxyUser;
-    protected long timeout;
-    protected ADFJobManager jobManager = new ADFJobManager();
-
-    private Map<String, JSONObject> linkedServicesMap = new HashMap<String, JSONObject>();
-    protected Map<String, JSONObject> tablesMap = new HashMap<String, JSONObject>();
-
-    public ADFJob(String msg, String id) throws FalconException {
-        this.id = id;
-        FSUtils.createDir(new Path(PROCESS_SCRIPTS_PATH));
-        try {
-            message = new JSONObject(msg);
-
-            frequency = DEFAULT_FREQUENCY;
-            startTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_START_TIME));
-            endTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_END_TIME));
-
-            JSONArray linkedServices = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICES);
-            for (int i = 0; i < linkedServices.length(); i++) {
-                JSONObject linkedService = linkedServices.getJSONObject(i);
-                linkedServicesMap.put(linkedService.getString(ADFJsonConstants.ADF_REQUEST_NAME), linkedService);
-            }
-
-            JSONArray tables = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_TABLES);
-            for (int i = 0; i < tables.length(); i++) {
-                JSONObject table = tables.getJSONObject(i);
-                tablesMap.put(table.getString(ADFJsonConstants.ADF_REQUEST_NAME), table);
-            }
-
-            // Set the activity extended properties
-            activity = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
-            if (activity == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
-                        + " request.");
-            }
-
-            JSONObject policy = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_POLICY);
-            /* IS policy mandatory */
-            if (policy == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_POLICY + " not found"
-                        + " in ADF request.");
-            }
-            String adfTimeout = policy.getString(ADFJsonConstants.ADF_REQUEST_TIMEOUT);
-            if (StringUtils.isBlank(adfTimeout)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TIMEOUT + " not found"
-                        + " in ADF request.");
-            }
-            timeout = parseADFRequestTimeout(adfTimeout);
-
-            JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
-            if (activityProperties == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found"
-                        + " in ADF request.");
-            }
-
-            activityExtendedProperties = activityProperties.getJSONObject(
-                    ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES);
-            if (activityExtendedProperties == null) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES + " not"
-                        + " found in ADF request.");
-            }
-
-            // should be called after setting activityExtendedProperties
-            proxyUser = getRunAsUser();
-
-            // log in the user
-            CurrentUser.authenticate(proxyUser);
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
-        }
-    }
-
-    public String jobEntityName() {
-        return ADF_JOB_ENTITY_NAME_PREFIX + id;
-    }
-
-    public JobType jobType() {
-        return type;
-    }
-
-    protected String getClusterName(String linkedServiceName) throws FalconException {
-        JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
-        if (linkedService == null) {
-            throw new FalconException("Linked service " + linkedServiceName + " not found in ADF request.");
-        }
-
-        try {
-            return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
-                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
-                    .getString(ADFJsonConstants.ADF_REQUEST_CLUSTER_NAME);
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing linked service " + linkedServiceName + " in ADF request.");
-        }
-    }
-
-    protected String getRunAsUser() throws FalconException {
-        if (activityExtendedProperties.has(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER)) {
-            String runAsUser = null;
-            try {
-                runAsUser = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
-            } catch (JSONException e) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
-                        + " found in ADF request.");
-            }
-
-            if (StringUtils.isBlank(runAsUser)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " in"
-                        + " ADF request activity extended properties cannot be empty.");
-            }
-            return runAsUser;
-        } else {
-            String hadoopLinkedService = getHadoopLinkedService();
-            JSONObject linkedService = linkedServicesMap.get(hadoopLinkedService);
-            if (linkedService == null) {
-                throw new FalconException("JSON object " + hadoopLinkedService + " not"
-                        + " found in ADF request.");
-            }
-
-            try {
-                return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
-                        .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
-                        .getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
-            } catch (JSONException e) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
-                        + " found in ADF request.");
-            }
-        }
-    }
-
-    protected List<String> getInputTables() throws FalconException {
-        List<String> tables = new ArrayList<String>();
-        try {
-            JSONArray inputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
-                    .getJSONArray(ADFJsonConstants.ADF_REQUEST_INPUTS);
-            for (int i = 0; i < inputs.length(); i++) {
-                tables.add(inputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
-            }
-        } catch (JSONException e) {
-            throw new FalconException("Error while reading input table names in ADF request.");
-        }
-        return tables;
-    }
-
-    protected List<String> getOutputTables() throws FalconException {
-        List<String> tables = new ArrayList<String>();
-        try {
-            JSONArray outputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
-                    .getJSONArray(ADFJsonConstants.ADF_REQUEST_OUTPUTS);
-            for (int i = 0; i < outputs.length(); i++) {
-                tables.add(outputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
-            }
-        } catch (JSONException e) {
-            throw new FalconException("Error while reading output table names in ADF request.");
-        }
-        return tables;
-    }
-
-    protected String getADFTablePath(String tableName) throws FalconException {
-        JSONObject table = tablesMap.get(tableName);
-        if (table == null) {
-            throw new FalconException("JSON object " + tableName + " not"
-                    + " found in ADF request.");
-        }
-
-        try {
-            JSONObject location = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
-                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION);
-            String requestType = location.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
-            if (requestType.equals(ADFJsonConstants.ADF_REQUEST_LOCATION_TYPE_AZURE_BLOB)) {
-                String blobPath = location.getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
-                int index = blobPath.indexOf('/');
-                if (index == -1) {
-                    throw new FalconException("Invalid azure blob path: " + blobPath);
-                }
-
-                String linkedServiceName = location.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
-                JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
-                if (linkedService == null) {
-                    throw new FalconException("Can't find linked service " + linkedServiceName + " for azure blob");
-                }
-                String connectionString = linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
-                        .getString(ADFJsonConstants.ADF_REQUEST_CONNECTION_STRING);
-                int accountNameIndex = connectionString.indexOf(ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME)
-                        + ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME.length();
-                String accountName = connectionString.substring(accountNameIndex,
-                        connectionString.indexOf(';', accountNameIndex));
-
-                StringBuilder blobUrl = new StringBuilder("wasb://")
-                        .append(blobPath.substring(0, index)).append("@")
-                        .append(accountName).append(".blob.core.windows.net")
-                        .append(blobPath.substring(index));
-                return blobUrl.toString();
-            }
-            return location.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
-                    .getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing ADF JSON message: " + tableName, e);
-        }
-    }
-
-    protected String getTableCluster(String tableName) throws FalconException {
-        JSONObject table = tablesMap.get(tableName);
-        if (table == null) {
-            throw new FalconException("Table " + tableName + " not found in ADF request.");
-        }
-
-        try {
-            String linkedServiceName = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
-                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION)
-                    .getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
-            return getClusterName(linkedServiceName);
-        } catch (JSONException e) {
-            throw new FalconException("Error while parsing table cluster " + tableName + " in ADF request.");
-        }
-    }
-
-    protected boolean activityHasScriptPath() throws FalconException {
-        if (JobType.REPLICATION == jobType()) {
-            return false;
-        }
-
-        return activityExtendedProperties.has(
-                ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
-    }
-
-    protected String getScriptPath() throws FalconException {
-        if (!activityHasScriptPath()) {
-            throw new FalconException("JSON object does not have object: "
-                    + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
-        }
-
-        try {
-            String scriptPath = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
-            if (StringUtils.isBlank(scriptPath)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH + " not"
-                        + " found or empty in ADF request.");
-            }
-            return scriptPath;
-        } catch (JSONException jsonException) {
-            throw new FalconException("Error while parsing ADF JSON object: "
-                    + activityExtendedProperties, jsonException);
-        }
-    }
-
-    protected String getScriptContent() throws FalconException {
-        if (activityHasScriptPath()) {
-            throw new FalconException("JSON object does not have object: " + ADFJsonConstants.ADF_REQUEST_SCRIPT);
-        }
-        try {
-            String script = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT);
-            if (StringUtils.isBlank(script)) {
-                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT + " cannot"
-                        + " be empty in ADF request.");
-            }
-            return script;
-        } catch (JSONException jsonException) {
-            throw new FalconException("Error while parsing ADF JSON object: "
-                    + activityExtendedProperties, jsonException);
-        }
-    }
-
-    protected String getClusterNameToRunProcessOn() throws FalconException {
-        return getClusterName(getHadoopLinkedService());
-    }
-
-    protected Entity submitAndScheduleJob(String entityType, String msg) throws FalconException {
-        Entity entity = jobManager.submitJob(entityType, msg);
-        jobManager.scheduleJob(entityType, jobEntityName());
-        return entity;
-    }
-
-    private String getHadoopLinkedService() throws FalconException {
-        String hadoopLinkedService;
-        try {
-            hadoopLinkedService = activity.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
-        } catch (JSONException jsonException) {
-            throw new FalconException("Error while parsing ADF JSON object: "
-                    + activity, jsonException);
-        }
-
-        if (StringUtils.isBlank(hadoopLinkedService)) {
-            throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME
-                    + " in the activity cannot be empty in ADF request.");
-        }
-        return hadoopLinkedService;
-    }
-
-    protected void startProcess(Feed inputFeed, Feed outputFeed,
-                                String engineType, String scriptPath) throws FalconException {
-        // submit input/output feeds
-        LOG.info("submitting input feed {} for {} process", inputFeed.getName(), engineType);
-        jobManager.submitJob(EntityType.FEED.name(), inputFeed.getEntityxml());
-
-        LOG.info("submitting output feed {} for {} process", outputFeed.getName(), engineType);
-        jobManager.submitJob(EntityType.FEED.name(), outputFeed.getEntityxml());
-
-        // submit and schedule process
-        String processRequest = new Process.Builder().withProcessName(jobEntityName()).withFrequency(frequency)
-                .withStartTime(startTime).withEndTime(endTime).withClusterName(getClusterNameToRunProcessOn())
-                .withInputFeedName(inputFeed.getName()).withOutputFeedName(outputFeed.getName())
-                .withEngineType(engineType).withWFPath(scriptPath).withAclOwner(proxyUser)
-                .build().getEntityxml();
-
-        LOG.info("submitting/scheduling {} process: {}", engineType, processRequest);
-        submitAndScheduleJob(EntityType.PROCESS.name(), processRequest);
-        LOG.info("submitted and scheduled {} process: {}", engineType, jobEntityName());
-    }
-
-    protected void cleanupProcess(Feed inputFeed, Feed outputFeed) throws FalconException {
-        // delete the entities. Should be called after the job execution success/failure.
-        jobManager.deleteEntity(EntityType.PROCESS.name(), jobEntityName());
-        jobManager.deleteEntity(EntityType.FEED.name(), inputFeed.getName());
-        jobManager.deleteEntity(EntityType.FEED.name(), outputFeed.getName());
-
-        // delete script file
-        FSUtils.removeDir(new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName()));
-    }
-
-    protected String createScriptFile(String fileExt) throws FalconException {
-        String content = getScriptContent();
-
-        // create dir; dir path is unique as job name is always unique
-        final Path dir = new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName());
-        FSUtils.createDir(dir);
-
-        // create script file
-        final Path path = new Path(dir, jobEntityName() + fileExt);
-        return FSUtils.createFile(path, content);
-    }
-
-    private static long parseADFRequestTimeout(String timeout) throws FalconException {
-        timeout = timeout.trim();
-        //  [ws][-]{ d | d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]] }[ws]
-        if (timeout.startsWith("-")) {
-            return -1;
-        }
-
-        long totalMinutes = 0;
-        String [] dotParts = timeout.split(Pattern.quote("."));
-        if (dotParts.length == 1) {
-            // no d or ff
-            // chk if only d
-            // Formats can be d|hh:mm[:ss]
-            String[] parts = timeout.split(":");
-            if (parts.length == 1) {
-                // only day. Convert days to minutes
-                return Integer.parseInt(parts[0]) * 1440;
-            } else {
-                // hh:mm[:ss]
-                return computeMinutes(parts);
-            }
-        }
-
-        // if . is present, formats can be d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]]
-        if (dotParts.length == 2) {
-            // can be d.hh:mm[:ss] or hh:mm[:ss[.ff]
-            // check if first part has colons
-            String [] parts = dotParts[0].split(":");
-            if (parts.length == 1) {
-                // format is d.hh:mm[:ss]
-                totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
-                parts = dotParts[1].split(":");
-                totalMinutes += computeMinutes(parts);
-                return totalMinutes;
-            } else {
-                // format is hh:mm[:ss[.ff]
-                parts = dotParts[0].split(":");
-                totalMinutes += computeMinutes(parts);
-                // round off ff
-                totalMinutes +=  1;
-                return totalMinutes;
-            }
-        } else if (dotParts.length == 3) {
-            // will be d.hh:mm[:ss[.ff]
-            totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
-            String [] parts = dotParts[1].split(":");
-            totalMinutes += computeMinutes(parts);
-            // round off ff
-            totalMinutes +=  1;
-            return totalMinutes;
-        } else {
-            throw new FalconException("Error parsing policy timeout: " + timeout);
-        }
-    }
-
-    // format hh:mm[:ss]
-    private static long computeMinutes(String[] parts) {
-        // hh:mm[:ss]
-        int totalMinutes = Integer.parseInt(parts[0]) * 60;
-        totalMinutes += Integer.parseInt(parts[1]);
-        if (parts.length == 3) {
-            // Second round off to minutes
-            totalMinutes +=  1;
-        }
-        return totalMinutes;
-    }
-
-    private static String transformTimeFormat(String adfTime) {
-        return adfTime.substring(0, adfTime.length()-4) + "Z";
-    }
-
-    protected class ADFJobManager extends AbstractSchedulableEntityManager {
-        public Entity submitJob(String entityType, String msg) throws FalconException {
-            try {
-                InputStream stream = IOUtils.toInputStream(msg);
-                Entity entity = submitInternal(stream, entityType, proxyUser);
-                return entity;
-            } catch (Exception e) {
-                LOG.info(e.toString());
-                throw new FalconException("Error when submitting job: " + e.toString());
-            }
-        }
-
-        public void scheduleJob(String entityType, String entityName) throws FalconException {
-            try {
-                scheduleInternal(entityType, entityName, null, EntityUtil.getPropertyMap(null));
-            } catch (Exception e) {
-                LOG.info(e.toString());
-                throw new FalconException("Error when scheduling job: " + e.toString());
-            }
-        }
-
-        public void deleteEntity(String entityType, String entityName) throws FalconException {
-            delete(entityType, entityName, null);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
deleted file mode 100644
index ceea6a4..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Azure ADB Job factory to generate ADFJob for each job type.
- */
-public final class ADFJobFactory {
-    public static ADFJob buildADFJob(String msg, String id) throws FalconException {
-        ADFJob.JobType jobType = ADFJob.getJobType(msg);
-        switch (jobType) {
-        case REPLICATION:
-            return new ADFReplicationJob(msg, id);
-        case HIVE:
-            return new ADFHiveJob(msg, id);
-        case PIG:
-            return new ADFPigJob(msg, id);
-        default:
-            throw new FalconException("Invalid job type: " + jobType.toString());
-        }
-    }
-
-    private ADFJobFactory() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
deleted file mode 100644
index 041eb48..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Azure ADF Pig Job.
- */
-public class ADFPigJob extends ADFJob {
-    private static final String PIG_SCRIPT_EXTENSION = ".pig";
-    private static final String ENGINE_TYPE = "pig";
-    private static final String INPUT_FEED_SUFFIX = "-pig-input-feed";
-    private static final String OUTPUT_FEED_SUFFIX = "-pig-output-feed";
-
-    private String pigScriptPath;
-    private DataFeed inputDataFeed;
-    private DataFeed outputDataFeed;
-
-    public ADFPigJob(String message, String id) throws FalconException {
-        super(message, id);
-        type = JobType.PIG;
-        inputDataFeed = getInputFeed();
-        outputDataFeed = getOutputFeed();
-        pigScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(PIG_SCRIPT_EXTENSION);
-    }
-
-    @Override
-    public void startJob() throws FalconException {
-        startProcess(inputDataFeed, outputDataFeed, ENGINE_TYPE, pigScriptPath);
-    }
-
-    @Override
-    public void cleanup() throws FalconException {
-        cleanupProcess(inputDataFeed, outputDataFeed);
-    }
-
-    private DataFeed getInputFeed() throws FalconException {
-        return getFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0),
-                getTableCluster(getInputTables().get(0)));
-    }
-
-    private DataFeed getOutputFeed() throws FalconException {
-        return getFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0),
-                getTableCluster(getOutputTables().get(0)));
-    }
-
-    private DataFeed getFeed(final String feedName, final String tableName,
-                             final String clusterName) throws FalconException {
-        return new DataFeed.Builder().withFeedName(feedName).withFrequency(frequency)
-                .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime)
-                .withAclOwner(proxyUser).withLocationPath(getADFTablePath(tableName)).build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
deleted file mode 100644
index 3438b2f..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import com.microsoft.windowsazure.Configuration;
-import com.microsoft.windowsazure.exception.ServiceException;
-import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
-import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
-import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
-import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
-import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
-import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
-import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.adfservice.util.ADFJsonConstants;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.resource.AbstractInstanceManager;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesResult.Instance;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowExecutionListener;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Falcon ADF provider to handle requests from Azure Data Factory.
- */
-public class ADFProviderService implements FalconService, WorkflowExecutionListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ADFProviderService.class);
-
-    /**
-     * Constant for the service name.
-     */
-    public static final String SERVICE_NAME = ADFProviderService.class.getSimpleName();
-
-    private static final int AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT = 60;
-    // polling frequency in seconds
-    private static final int AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY = 10;
-
-    // Number of threads to handle ADF requests
-    private static final int AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS = 5;
-
-    private static final String AZURE_SERVICEBUS_CONF_PREFIX = "microsoft.windowsazure.services.servicebus.";
-    private static final String AZURE_SERVICEBUS_CONF_SASKEYNAME = "sasKeyName";
-    private static final String AZURE_SERVICEBUS_CONF_SASKEY = "sasKey";
-    private static final String AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI = "serviceBusRootUri";
-    private static final String AZURE_SERVICEBUS_CONF_NAMESPACE = "namespace";
-    private static final String AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY = "polling.frequency";
-    private static final String AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME = "requestqueuename";
-    private static final String AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME = "statusqueuename";
-    private static final String AZURE_SERVICEBUS_CONF_SUPER_USER = "superuser";
-
-    private static final ConfigurationStore STORE = ConfigurationStore.get();
-
-    private ServiceBusContract service;
-    private ScheduledExecutorService adfScheduledExecutorService;
-    private ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
-    private ADFInstanceManager instanceManager = new ADFInstanceManager();
-    private String requestQueueName;
-    private String statusQueueName;
-    private String superUser;
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        // read start up properties for adf configuration
-        service = ServiceBusService.create(getServiceBusConfig());
-
-        requestQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME);
-        if (StringUtils.isBlank(requestQueueName)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME
-                    + " property not set in startup properties. Please add it.");
-        }
-        statusQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME);
-        if (StringUtils.isBlank(statusQueueName)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME
-                    + " property not set in startup properties. Please add it.");
-        }
-
-        // init opts
-        opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
-        opts.setTimeout(AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT);
-
-        // restart handling
-        superUser = StartupProperties.get().getProperty(
-                AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER);
-        if (StringUtils.isBlank(superUser)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER
-                    + " property not set in startup properties. Please add it.");
-        }
-        CurrentUser.authenticate(superUser);
-        for (EntityType entityType : EntityType.values()) {
-            Collection<String> entities = STORE.getEntities(entityType);
-            for (String entityName : entities) {
-                updateJobStatus(entityName, entityType.toString());
-            }
-        }
-
-        Services.get().<WorkflowJobEndNotificationService>getService(
-                WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
-        adfScheduledExecutorService = new ADFScheduledExecutor(AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS);
-        adfScheduledExecutorService.scheduleWithFixedDelay(new HandleADFRequests(), 0, getDelay(), TimeUnit.SECONDS);
-        LOG.info("Falcon ADFProvider service initialized");
-    }
-
-    private class HandleADFRequests implements Runnable {
-
-        @Override
-        public void run() {
-            String sessionID = null;
-            try {
-                LOG.info("To read message from adf...");
-                ReceiveQueueMessageResult resultQM =
-                        service.receiveQueueMessage(requestQueueName, opts);
-                BrokeredMessage message = resultQM.getValue();
-                if (message != null && message.getMessageId() != null) {
-                    sessionID = message.getReplyToSessionId();
-                    BufferedReader rd = new BufferedReader(
-                            new InputStreamReader(message.getBody()));
-                    StringBuilder sb = new StringBuilder();
-                    String line;
-                    while ((line = rd.readLine()) != null) {
-                        sb.append(line);
-                    }
-                    rd.close();
-                    String msg = sb.toString();
-                    LOG.info("ADF message: " + msg);
-
-                    service.deleteMessage(message);
-
-                    ADFJob job = ADFJobFactory.buildADFJob(msg, sessionID);
-                    job.startJob();
-                } else {
-                    LOG.info("No message from adf");
-                }
-            } catch (FalconException e) {
-                if (sessionID != null) {
-                    sendErrorMessage(sessionID, e.toString());
-                }
-                LOG.info(e.toString());
-            } catch (ServiceException | IOException e) {
-                LOG.info(e.toString());
-            }
-        }
-    }
-
-    private static Configuration getServiceBusConfig() throws FalconException {
-        String namespace = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_NAMESPACE);
-        if (StringUtils.isBlank(namespace)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_NAMESPACE
-                    + " property not set in startup properties. Please add it.");
-        }
-
-        String sasKeyName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_SASKEYNAME);
-        if (StringUtils.isBlank(sasKeyName)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEYNAME
-                    + " property not set in startup properties. Please add it.");
-        }
-
-        String sasKey = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_SASKEY);
-        if (StringUtils.isBlank(sasKey)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEY
-                    + " property not set in startup properties. Please add it.");
-        }
-
-        String serviceBusRootUri = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI);
-        if (StringUtils.isBlank(serviceBusRootUri)) {
-            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI
-                    + " property not set in startup properties. Please add it.");
-        }
-
-        LOG.info("namespace: {}, sas key name: {}, sas key: {}, root uri: {}",
-                        namespace, sasKeyName, sasKey, serviceBusRootUri);
-        return ServiceBusConfiguration.configureWithSASAuthentication(namespace, sasKeyName, sasKey,
-                serviceBusRootUri);
-    }
-
-
-    // gets delay in seconds
-    private long getDelay() throws FalconException {
-        String pollingFrequencyValue = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
-                + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY);
-        long pollingFrequency;
-        try {
-            pollingFrequency = (StringUtils.isNotEmpty(pollingFrequencyValue))
-                    ? Long.parseLong(pollingFrequencyValue) : AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY;
-        } catch (NumberFormatException nfe) {
-            throw new FalconException("Invalid value provided for startup property "
-                    + AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY
-                    + ", please provide a valid long number", nfe);
-        }
-        return pollingFrequency;
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        Services.get().<WorkflowJobEndNotificationService>getService(
-                WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this);
-        adfScheduledExecutorService.shutdown();
-    }
-
-    @Override
-    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
-        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_SUCCEEDED, 100);
-    }
-
-    @Override
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
-        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_FAILED, 0);
-    }
-
-    @Override
-    public void onStart(WorkflowExecutionContext context) throws FalconException {
-        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0);
-    }
-
-    @Override
-    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
-        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_CANCELED, 0);
-    }
-
-    @Override
-    public void onWait(WorkflowExecutionContext context) throws FalconException {
-        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0);
-    }
-
-    private void updateJobStatus(String entityName, String entityType) throws FalconException {
-        // Filter non-adf jobs
-        if (!ADFJob.isADFJobEntity(entityName)) {
-            return;
-        }
-
-        Instance instance = instanceManager.getFirstInstance(entityName, entityType);
-        if (instance == null) {
-            return;
-        }
-
-        WorkflowStatus workflowStatus = instance.getStatus();
-        String status;
-        int progress = 0;
-        switch (workflowStatus) {
-        case SUCCEEDED:
-            progress = 100;
-            status = ADFJsonConstants.ADF_STATUS_SUCCEEDED;
-            break;
-        case FAILED:
-        case KILLED:
-        case ERROR:
-        case SKIPPED:
-        case UNDEFINED:
-            status = ADFJsonConstants.ADF_STATUS_FAILED;
-            break;
-        default:
-            status = ADFJsonConstants.ADF_STATUS_EXECUTING;
-        }
-        updateJobStatus(entityName, status, progress, instance.getLogFile());
-    }
-
-    private void updateJobStatus(WorkflowExecutionContext context, String status, int progress) {
-        // Filter non-adf jobs
-        String entityName = context.getEntityName();
-        if (!ADFJob.isADFJobEntity(entityName)) {
-            return;
-        }
-
-        updateJobStatus(entityName, status, progress, context.getLogFile());
-    }
-
-    private void updateJobStatus(String entityName, String status, int progress, String logUrl) {
-        try {
-            String sessionID = ADFJob.getSessionID(entityName);
-            LOG.info("To update job status: " + sessionID + ", " + entityName + ", " + status + ", " + logUrl);
-            JSONObject obj = new JSONObject();
-            obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME);
-            obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID);
-            obj.put(ADFJsonConstants.ADF_STATUS_LOG_URL, logUrl);
-            obj.put(ADFJsonConstants.ADF_STATUS_STATUS, status);
-            obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, progress);
-            sendStatusUpdate(sessionID, obj.toString());
-        } catch (JSONException | FalconException e) {
-            LOG.info("Error when updating job status: " + e.toString());
-        }
-    }
-
-    private void sendErrorMessage(String sessionID, String errorMessage) {
-        LOG.info("Sending error message for session " + sessionID + ": " + errorMessage);
-        try {
-            JSONObject obj = new JSONObject();
-            obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME);
-            obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID);
-            obj.put(ADFJsonConstants.ADF_STATUS_STATUS, ADFJsonConstants.ADF_STATUS_FAILED);
-            obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, 0);
-            obj.put(ADFJsonConstants.ADF_STATUS_ERROR_TYPE, ADFJsonConstants.ADF_STATUS_ERROR_TYPE_VALUE);
-            obj.put(ADFJsonConstants.ADF_STATUS_ERROR_MESSAGE, errorMessage);
-            sendStatusUpdate(sessionID, obj.toString());
-        } catch (JSONException e) {
-            LOG.info("Error when sending error message: " + e.toString());
-        }
-    }
-
-    private void sendStatusUpdate(String sessionID, String message) {
-        LOG.info("Sending update for session " + sessionID + ": " + message);
-        try {
-            InputStream in = IOUtils.toInputStream(message, "UTF-8");
-            BrokeredMessage updateMessage = new BrokeredMessage(in);
-            updateMessage.setSessionId(sessionID);
-            service.sendQueueMessage(statusQueueName, updateMessage);
-        } catch (IOException | ServiceException e) {
-            LOG.info("Error when sending status update: " + e.toString());
-        }
-    }
-
-    private static class ADFInstanceManager extends AbstractInstanceManager {
-        public Instance getFirstInstance(String entityName, String entityType) throws FalconException {
-            InstancesResult result = getStatus(entityType, entityName, null, null, null, null, "", "", "", 0, 1, null);
-            Instance[] instances = result.getInstances();
-            if (instances.length > 0) {
-                return instances[0];
-            }
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
deleted file mode 100644
index f847a82..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import java.net.URISyntaxException;
-
-import org.apache.falcon.adfservice.util.FSUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Azure ADF Replication Job (hive/hdfs to Azure blobs).
- */
-public class ADFReplicationJob extends ADFJob {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ADFReplicationJob.class);
-
-    public static final String TEMPLATE_REPLICATION_FEED = "replicate-feed.xml";
-    public static final String REPLICATION_TARGET_CLUSTER = "adf-replication-target-cluster";
-
-    public ADFReplicationJob(String message, String id) throws FalconException {
-        super(message, id);
-        type = JobType.REPLICATION;
-    }
-
-    @Override
-    public void startJob() throws FalconException {
-        try {
-            // Note: in first clickstop, we support only one input table and one output table for replication job
-            String inputTableName = getInputTables().get(0);
-            String outputTableName = getOutputTables().get(0);
-            String template = FSUtils.readHDFSFile(TEMPLATE_PATH_PREFIX, TEMPLATE_REPLICATION_FEED);
-            String message = template.replace("$feedName$", jobEntityName())
-                    .replace("$frequency$", frequency)
-                    .replace("$startTime$", startTime)
-                    .replace("$endTime$", endTime)
-                    .replace("$clusterSource$", getTableCluster(inputTableName))
-                    .replace("$clusterTarget$", REPLICATION_TARGET_CLUSTER)
-                    .replace("$sourceLocation$", getADFTablePath(inputTableName))
-                    .replace("$targetLocation$", getADFTablePath(outputTableName));
-            submitAndScheduleJob(EntityType.FEED.name(), message);
-        } catch (URISyntaxException e) {
-            LOG.info(e.toString());
-        }
-
-    }
-
-    @Override
-    public void cleanup() throws FalconException {
-        // Delete the entities. Should be called after the job execution success/failure.
-        jobManager.deleteEntity(EntityType.FEED.name(), jobEntityName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
deleted file mode 100644
index df5a993..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ADF thread pool executor.
- */
-public class ADFScheduledExecutor extends ScheduledThreadPoolExecutor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ADFScheduledExecutor.class);
-
-    public ADFScheduledExecutor(int corePoolSize) {
-        super(corePoolSize);
-    }
-
-    @Override
-    public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
-        return super.scheduleAtFixedRate(wrapRunnable(command), initialDelay, period, unit);
-    }
-
-    @Override
-    public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-        return super.scheduleWithFixedDelay(wrapRunnable(command), initialDelay, delay, unit);
-    }
-
-    private Runnable wrapRunnable(Runnable command) {
-        return new LogOnExceptionRunnable(command);
-    }
-
-    private static class LogOnExceptionRunnable implements Runnable {
-        private Runnable runnable;
-
-        public LogOnExceptionRunnable(Runnable runnable) {
-            super();
-            this.runnable = runnable;
-        }
-
-        @Override
-        public void run() {
-            try {
-                runnable.run();
-            } catch (Throwable t) {
-                LOG.info("Error while executing: {}", t.getMessage());
-                throw new RuntimeException(t);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
deleted file mode 100644
index 32d2757..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.falcon.adfservice.util.FSUtils;
-import org.apache.falcon.FalconException;
-
-import java.net.URISyntaxException;
-
-/**
- * Class for data Feed.
- */
-public class DataFeed extends Feed {
-    private static final String FEED_TEMPLATE_FILE = "feed.xml";
-    private String locationPath;
-
-    public DataFeed(final Builder builder) {
-        this.feedName = builder.name;
-        this.clusterName = builder.feedClusterName;
-        this.frequency = builder.feedFrequency;
-        this.startTime = builder.feedStartTime;
-        this.endTime = builder.feedEndTime;
-        this.locationPath = builder.feedLocationPath;
-        this.aclOwner = builder.feedAclOwner;
-    }
-
-    @Override
-    public String getEntityxml() throws FalconException {
-        try {
-            String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, FEED_TEMPLATE_FILE);
-            return template.replace("$feedName$", feedName)
-                    .replace("$frequency$", frequency)
-                    .replace("$startTime$", startTime)
-                    .replace("$endTime$", endTime)
-                    .replace("$cluster$", clusterName)
-                    .replace("$location$", locationPath)
-                    .replace("$aclowner$", aclOwner);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Error when generating entity xml for table feed", e);
-        }
-    }
-
-    /**
-     * Builder for table Feed.
-     */
-    public static class Builder {
-        private String name;
-        private String feedClusterName;
-        private String feedFrequency;
-        private String feedStartTime;
-        private String feedEndTime;
-        private String feedLocationPath;
-        private String feedAclOwner;
-
-        public DataFeed build() {
-            return new DataFeed(this);
-        }
-
-        public Builder withFeedName(final String feedName) {
-            this.name = feedName;
-            return this;
-        }
-
-        public Builder withClusterName(final String clusterName) {
-            this.feedClusterName = clusterName;
-            return this;
-        }
-
-        public Builder withFrequency(final String frequency) {
-            this.feedFrequency = frequency;
-            return this;
-        }
-
-        public Builder withStartTime(final String startTime) {
-            this.feedStartTime = startTime;
-            return this;
-        }
-
-        public Builder withEndTime(final String endTime) {
-            this.feedEndTime = endTime;
-            return this;
-        }
-
-        public Builder withLocationPath(final String locationPath) {
-            this.feedLocationPath = locationPath;
-            return this;
-        }
-
-        public Builder withAclOwner(final String aclOwner) {
-            this.feedAclOwner = aclOwner;
-            return this;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
deleted file mode 100644
index d05f300..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Abstract class for feed.
- */
-public abstract class Feed {
-    protected String feedName;
-    protected String clusterName;
-    protected String frequency;
-    protected String startTime;
-    protected String endTime;
-    protected String aclOwner;
-
-    public String getName() {
-        return feedName;
-    }
-
-    public abstract String getEntityxml() throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
deleted file mode 100644
index 3a65753..0000000
--- a/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.adfservice;
-
-import org.apache.falcon.adfservice.util.FSUtils;
-import org.apache.falcon.FalconException;
-
-import java.net.URISyntaxException;
-
-/**
- * Class for process.
- */
-public class Process {
-    private static final String PROCESS_TEMPLATE_FILE = "process.xml";
-
-    private String entityName;
-    private String frequency;
-    private String startTime;
-    private String endTime;
-    private String clusterName;
-    private String inputFeedName;
-    private String outputFeedName;
-    private String engineType;
-    private String wfPath;
-    private String aclOwner;
-
-    public Process(final Builder builder) {
-        this.entityName = builder.name;
-        this.clusterName = builder.processClusterName;
-        this.frequency = builder.processFrequency;
-        this.startTime = builder.processStartTime;
-        this.endTime = builder.processEndTime;
-        this.inputFeedName = builder.processInputFeedName;
-        this.outputFeedName = builder.processOutputFeedName;
-        this.engineType = builder.processEngineType;
-        this.wfPath = builder.processWfPath;
-        this.aclOwner = builder.processAclOwner;
-    }
-
-    public String getName() {
-        return entityName;
-    }
-
-    public String getEntityxml() throws FalconException {
-        try {
-            String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, PROCESS_TEMPLATE_FILE);
-            return template.replace("$processName$", entityName)
-                    .replace("$frequency$", frequency)
-                    .replace("$startTime$", startTime)
-                    .replace("$endTime$", endTime)
-                    .replace("$clusterName$", clusterName)
-                    .replace("$inputFeedName$", inputFeedName)
-                    .replace("$outputFeedName$", outputFeedName)
-                    .replace("$engine$", engineType)
-                    .replace("$scriptPath$", wfPath)
-                    .replace("$aclowner$", aclOwner);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Error when generating process xml", e);
-        }
-    }
-
-    /**
-     * Builder for process.
-     */
-    public static class Builder {
-        private String name;
-        private String processClusterName;
-        private String processFrequency;
-        private String processStartTime;
-        private String processEndTime;
-        private String processInputFeedName;
-        private String processOutputFeedName;
-        private String processEngineType;
-        private String processWfPath;
-        private String processAclOwner;
-
-        public Process build() {
-            return new Process(this);
-        }
-
-        public Builder withProcessName(final String processName) {
-            this.name = processName;
-            return this;
-        }
-
-        public Builder withClusterName(final String clusterName) {
-            this.processClusterName = clusterName;
-            return this;
-        }
-
-        public Builder withFrequency(final String frequency) {
-            this.processFrequency = frequency;
-            return this;
-        }
-
-        public Builder withStartTime(final String startTime) {
-            this.processStartTime = startTime;
-            return this;
-        }
-
-        public Builder withEndTime(final String endTime) {
-            this.processEndTime = endTime;
-            return this;
-        }
-
-        public Builder withInputFeedName(final String inputFeedName) {
-            this.processInputFeedName = inputFeedName;
-            return this;
-        }
-
-        public Builder withOutputFeedName(final String outputFeedName) {
-            this.processOutputFeedName = outputFeedName;
-            return this;
-        }
-
-        public Builder withAclOwner(final String aclOwner) {
-            this.processAclOwner = aclOwner;
-            return this;
-        }
-
-        public Builder withEngineType(final String engineType) {
-            this.processEngineType = engineType;
-            return this;
-        }
-
-        public Builder withWFPath(final String wfPath) {
-            this.processWfPath = wfPath;
-            return this;
-        }
-    }
-
-}


Mime
View raw message