falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peeyu...@apache.org
Subject [2/2] falcon git commit: FALCON-1627 Provider integration with Azure Data Factory pipelines
Date Fri, 19 Feb 2016 04:35:47 GMT
FALCON-1627 Provider integration with Azure Data Factory pipelines

This integration allows Microsoft Azure Data Factory pipelines to invoke Falcon activities (i.e. replication, hive and pig proessing work), so the user can build a hybrid Hadoop data pipelines leveraging on-premises Hadoop clusters and cloud based Cortana Analytics services like HDInsight Hadoop clusters and Azure Machine Learning.

Incorporated review comments from Balu on apache review board:
https://reviews.apache.org/r/42826/

Author: yzheng-hortonworks <yzheng@hortonworks.com>

Reviewers: Sandeep Samudrala<sandysmdl@gmail.com>, Peeyush Bishnoi<peeyushb@apache.org>, Balu Vellanki<bvellanki@hortonworks.com>, Venkat Ranganathan<n.r.v@live.com>

Closes #27 from yzheng-hortonworks/FALCON-1627 and squashes the following commits:

9e13745 [yzheng-hortonworks] throw exception if no super user for adf in the startup properties
63eedfd [yzheng-hortonworks] polish log (review from peeyushb)
e5a1d97 [yzheng-hortonworks] review from PraveenAdlakha and peeyushb
2090aaf [yzheng-hortonworks] review from sandeepSamudrala
4c5f2b2 [yzheng-hortonworks] FALCON-1627 Provider integration with Azure Data Factory pipelines


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ad18b024
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ad18b024
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ad18b024

Branch: refs/heads/master
Commit: ad18b024f125fb8411a4331fedbfc21b5186dbb9
Parents: 9979a1f
Author: yzheng-hortonworks <yzheng@hortonworks.com>
Authored: Fri Feb 19 10:05:27 2016 +0530
Committer: peeyush b <pbishnoi@hortonworks.com>
Committed: Fri Feb 19 10:05:27 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 addons/adf/README                               |  59 ++
 addons/adf/pom.xml                              | 112 ++++
 .../apache/falcon/adfservice/ADFHiveJob.java    | 123 ++++
 .../org/apache/falcon/adfservice/ADFJob.java    | 556 +++++++++++++++++++
 .../apache/falcon/adfservice/ADFJobFactory.java |  43 ++
 .../org/apache/falcon/adfservice/ADFPigJob.java |  70 +++
 .../falcon/adfservice/ADFProviderService.java   | 370 ++++++++++++
 .../falcon/adfservice/ADFReplicationJob.java    |  71 +++
 .../falcon/adfservice/ADFScheduledExecutor.java |  71 +++
 .../org/apache/falcon/adfservice/DataFeed.java  | 110 ++++
 .../java/org/apache/falcon/adfservice/Feed.java |  39 ++
 .../org/apache/falcon/adfservice/Process.java   | 148 +++++
 .../org/apache/falcon/adfservice/TableFeed.java | 125 +++++
 .../adfservice/util/ADFJsonConstants.java       |  73 +++
 .../apache/falcon/adfservice/util/FSUtils.java  | 102 ++++
 addons/hivedr/pom.xml                           |   7 +-
 common/src/main/resources/startup.properties    |  30 +-
 pom.xml                                         |  30 +-
 src/conf/startup.properties                     |  30 +-
 webapp/pom.xml                                  |  14 +
 21 files changed, 2180 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 191f641..e311bad 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ Trunk
 
     FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava)
 
+    FALCON-1627 Provider integration with Azure Data Factory pipelines (Ying Zheng, Venkat Ranganathan, Sowmya Ramesh)
+
   IMPROVEMENTS
     FALCON-1584 Falcon allows invalid hadoop queue name for schedulable feed entities (Venkatesan Ramachandran via Balu Vellanki)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/README
----------------------------------------------------------------------
diff --git a/addons/adf/README b/addons/adf/README
new file mode 100644
index 0000000..39883b8
--- /dev/null
+++ b/addons/adf/README
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ADF Provider
+=======================
+
+
+Overview
+---------
+
+This integration allows Microsoft Azure Data Factory pipelines to invoke Falcon activities
+(i.e. replication, hive and pig proessing work), so the user can build a hybrid Hadoop data pipelines
+leveraging on-premises Hadoop clusters and cloud based Cortana Analytics services
+like HDInsight Hadoop clusters and Azure Machine Learning.
+
+
+Usage
+---------
+
+Falcon reads Azure Service Bus credentials from conf/startup.properties when it starts.
+So, the credential needs to be added before starting Falcon,
+and Falcon needs to be restarted if there is any change in the credential.
+
+Example:
+
+######### ADF Configurations start #########
+
+# A String object that represents the namespace
+*.microsoft.windowsazure.services.servicebus.namespace=hwtransport
+
+# Request and status queues on the namespace
+*.microsoft.windowsazure.services.servicebus.requestqueuename=adfrequest
+*.microsoft.windowsazure.services.servicebus.statusqueuename=adfstatus
+
+# A String object that contains the SAS key name
+*.microsoft.windowsazure.services.servicebus.sasKeyName=RootManageSharedAccessKey
+
+# A String object that contains the SAS key
+*.microsoft.windowsazure.services.servicebus.sasKey=4kt2x6yEoWZZSFZofyXEoxly0knHL7FPMqLD14ov1jo=
+
+# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
+# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
+*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=.servicebus.windows.net
+
+# Service bus polling frequency (in seconds)
+*.microsoft.windowsazure.services.servicebus.polling.frequency=60

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/pom.xml
----------------------------------------------------------------------
diff --git a/addons/adf/pom.xml b/addons/adf/pom.xml
new file mode 100644
index 0000000..898791e
--- /dev/null
+++ b/addons/adf/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.10-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <artifactId>falcon-adf</artifactId>
+    <description>Apache Falcon ADF Integration</description>
+    <name>Apache Falcon ADF Integration</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <azure.version>0.8.0</azure.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-prism</artifactId>
+            <version>${project.version}</version>
+            <classifier>classes</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-servicebus</artifactId>
+            <version>${azure.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <build>
+        <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+        <!--<testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>-->
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
new file mode 100644
index 0000000..6412c73
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.adfservice.util.ADFJsonConstants;
+import org.apache.falcon.FalconException;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Azure ADF Hive Job.
+ */
+public class ADFHiveJob extends ADFJob {
+    private static final String HIVE_SCRIPT_EXTENSION = ".hql";
+    private static final String ENGINE_TYPE = "hive";
+    private static final String INPUT_FEED_SUFFIX = "-hive-input-feed";
+    private static final String OUTPUT_FEED_SUFFIX = "-hive-output-feed";
+
+    private String hiveScriptPath;
+    private TableFeed inputFeed;
+    private TableFeed outputFeed;
+
+    public ADFHiveJob(String message, String id) throws FalconException {
+        super(message, id);
+        type = JobType.HIVE;
+        inputFeed = getInputTableFeed();
+        outputFeed = getOutputTableFeed();
+        hiveScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(HIVE_SCRIPT_EXTENSION);
+    }
+
+    @Override
+    public void startJob() throws FalconException {
+        startProcess(inputFeed, outputFeed, ENGINE_TYPE, hiveScriptPath);
+    }
+
+    @Override
+    public void cleanup() throws FalconException {
+        cleanupProcess(inputFeed, outputFeed);
+    }
+
+    private TableFeed getInputTableFeed() throws FalconException {
+        return getTableFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0),
+                getTableCluster(getInputTables().get(0)));
+    }
+
+    private TableFeed getOutputTableFeed() throws FalconException {
+        return getTableFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0),
+                getTableCluster(getOutputTables().get(0)));
+    }
+
+    private TableFeed getTableFeed(final String feedName, final String tableName,
+                                   final String clusterName) throws FalconException {
+        JSONObject tableExtendedProperties = getTableExtendedProperties(tableName);
+        String tableFeedName;
+        String partitions;
+
+        try {
+            tableFeedName = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_NAME);
+            if (StringUtils.isBlank(tableFeedName)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_NAME + " cannot"
+                        + " be empty in ADF request.");
+            }
+            partitions = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION);
+            if (StringUtils.isBlank(partitions)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION + " cannot"
+                        + " be empty in ADF request.");
+            }
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing ADF JSON message: " + tableExtendedProperties, e);
+        }
+
+        return new TableFeed.Builder().withFeedName(feedName).withFrequency(frequency)
+                .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime).
+                        withAclOwner(proxyUser).withTableName(tableFeedName).withPartitions(partitions).build();
+    }
+
+    private JSONObject getTableExtendedProperties(final String tableName) throws FalconException {
+        JSONObject table = tablesMap.get(tableName);
+        if (table == null) {
+            throw new FalconException("JSON object table " + tableName + " not found in ADF request.");
+        }
+
+        try {
+            JSONObject tableProperties = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES);
+            if (tableProperties == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_PROPERTIES
+                        + " not found in ADF request.");
+            }
+            JSONObject tablesLocation = tableProperties.getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION);
+            if (tablesLocation == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LOCATION
+                        + " not found in ADF request.");
+            }
+
+            JSONObject tableExtendedProperties = tablesLocation.getJSONObject(ADFJsonConstants.
+                    ADF_REQUEST_EXTENDED_PROPERTIES);
+            if (tableExtendedProperties == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES
+                        + " not found in ADF request.");
+            }
+            return tableExtendedProperties;
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing ADF JSON message: " + table, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
new file mode 100644
index 0000000..5d81338
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.adfservice.util.ADFJsonConstants;
+import org.apache.falcon.adfservice.util.FSUtils;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.fs.Path;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for Azure ADF jobs.
+ */
+public abstract class ADFJob {
+    private static final Logger LOG = LoggerFactory.getLogger(ADFJob.class);
+
+    // name prefix for all adf related entity, e.g. an adf hive process and the feeds associated with it
+    public static final String ADF_ENTITY_NAME_PREFIX = "ADF-";
+    public static final int ADF_ENTITY_NAME_PREFIX_LENGTH = ADF_ENTITY_NAME_PREFIX.length();
+    // name prefix for all adf related job entity, i.e. adf hive/pig process and replication feed
+    public static final String ADF_JOB_ENTITY_NAME_PREFIX = ADF_ENTITY_NAME_PREFIX + "JOB-";
+    public static final int ADF_JOB_ENTITY_NAME_PREFIX_LENGTH = ADF_JOB_ENTITY_NAME_PREFIX.length();
+
+    public static final String TEMPLATE_PATH_PREFIX = "/apps/falcon/adf/";
+    public static final String PROCESS_SCRIPTS_PATH = TEMPLATE_PATH_PREFIX
+            + Path.SEPARATOR + "generatedscripts";
+    private static final String DEFAULT_FREQUENCY = "days(1)";
+
+    public static boolean isADFJobEntity(String entityName) {
+        return entityName.startsWith(ADF_JOB_ENTITY_NAME_PREFIX);
+    }
+
+    public static String getSessionID(String entityName) throws FalconException {
+        if (!isADFJobEntity(entityName)) {
+            throw new FalconException("The entity, " + entityName + ", is not an ADF Job Entity.");
+        }
+        return entityName.substring(ADF_JOB_ENTITY_NAME_PREFIX_LENGTH);
+    }
+
+    /**
+     * Enum for job type.
+     */
+    public static enum JobType {
+        HIVE, PIG, REPLICATION
+    }
+
+    private static enum RequestType {
+        HADOOPMIRROR, HADOOPHIVE, HADOOPPIG
+    }
+
+    public static JobType getJobType(String msg) throws FalconException {
+        try {
+            JSONObject obj = new JSONObject(msg);
+            JSONObject activity = obj.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
+            if (activity == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
+                        + " request.");
+            }
+
+            JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
+            if (activityProperties == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found "
+                        + "in ADF request.");
+            }
+
+            String type = activityProperties.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
+            if (StringUtils.isBlank(type)) {
+                throw new FalconException(ADFJsonConstants.ADF_REQUEST_TYPE + " not found in ADF request msg");
+            }
+
+            switch (RequestType.valueOf(type.toUpperCase())) {
+            case HADOOPMIRROR:
+                return JobType.REPLICATION;
+            case HADOOPHIVE:
+                return JobType.HIVE;
+            case HADOOPPIG:
+                return JobType.PIG;
+            default:
+                throw new FalconException("Unrecognized ADF job type: " + type);
+            }
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
+        }
+    }
+
+    public abstract void startJob() throws FalconException;
+    public abstract void cleanup() throws FalconException;
+
+    protected JSONObject message;
+    protected JSONObject activity;
+    protected JSONObject activityExtendedProperties;
+    protected String id;
+    protected JobType type;
+    protected String startTime, endTime;
+    protected String frequency;
+    protected String proxyUser;
+    protected long timeout;
+    protected ADFJobManager jobManager = new ADFJobManager();
+
+    private Map<String, JSONObject> linkedServicesMap = new HashMap<String, JSONObject>();
+    protected Map<String, JSONObject> tablesMap = new HashMap<String, JSONObject>();
+
+    public ADFJob(String msg, String id) throws FalconException {
+        this.id = id;
+        FSUtils.createDir(new Path(PROCESS_SCRIPTS_PATH));
+        try {
+            message = new JSONObject(msg);
+
+            frequency = DEFAULT_FREQUENCY;
+            startTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_START_TIME));
+            endTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_END_TIME));
+
+            JSONArray linkedServices = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICES);
+            for (int i = 0; i < linkedServices.length(); i++) {
+                JSONObject linkedService = linkedServices.getJSONObject(i);
+                linkedServicesMap.put(linkedService.getString(ADFJsonConstants.ADF_REQUEST_NAME), linkedService);
+            }
+
+            JSONArray tables = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_TABLES);
+            for (int i = 0; i < tables.length(); i++) {
+                JSONObject table = tables.getJSONObject(i);
+                tablesMap.put(table.getString(ADFJsonConstants.ADF_REQUEST_NAME), table);
+            }
+
+            // Set the activity extended properties
+            activity = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
+            if (activity == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
+                        + " request.");
+            }
+
+            JSONObject policy = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_POLICY);
+            /* IS policy mandatory */
+            if (policy == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_POLICY + " not found"
+                        + " in ADF request.");
+            }
+            String adfTimeout = policy.getString(ADFJsonConstants.ADF_REQUEST_TIMEOUT);
+            if (StringUtils.isBlank(adfTimeout)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TIMEOUT + " not found"
+                        + " in ADF request.");
+            }
+            timeout = parseADFRequestTimeout(adfTimeout);
+
+            JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
+            if (activityProperties == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found"
+                        + " in ADF request.");
+            }
+
+            activityExtendedProperties = activityProperties.getJSONObject(
+                    ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES);
+            if (activityExtendedProperties == null) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES + " not"
+                        + " found in ADF request.");
+            }
+
+            // should be called after setting activityExtendedProperties
+            proxyUser = getRunAsUser();
+
+            // log in the user
+            CurrentUser.authenticate(proxyUser);
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
+        }
+    }
+
+    public String jobEntityName() {
+        return ADF_JOB_ENTITY_NAME_PREFIX + id;
+    }
+
+    public JobType jobType() {
+        return type;
+    }
+
+    protected String getClusterName(String linkedServiceName) throws FalconException {
+        JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
+        if (linkedService == null) {
+            throw new FalconException("Linked service " + linkedServiceName + " not found in ADF request.");
+        }
+
+        try {
+            return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
+                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
+                    .getString(ADFJsonConstants.ADF_REQUEST_CLUSTER_NAME);
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing linked service " + linkedServiceName + " in ADF request.");
+        }
+    }
+
+    protected String getRunAsUser() throws FalconException {
+        if (activityExtendedProperties.has(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER)) {
+            String runAsUser = null;
+            try {
+                runAsUser = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
+            } catch (JSONException e) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
+                        + " found in ADF request.");
+            }
+
+            if (StringUtils.isBlank(runAsUser)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " in"
+                        + " ADF request activity extended properties cannot be empty.");
+            }
+            return runAsUser;
+        } else {
+            String hadoopLinkedService = getHadoopLinkedService();
+            JSONObject linkedService = linkedServicesMap.get(hadoopLinkedService);
+            if (linkedService == null) {
+                throw new FalconException("JSON object " + hadoopLinkedService + " not"
+                        + " found in ADF request.");
+            }
+
+            try {
+                return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
+                        .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
+                        .getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
+            } catch (JSONException e) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
+                        + " found in ADF request.");
+            }
+        }
+    }
+
+    protected List<String> getInputTables() throws FalconException {
+        List<String> tables = new ArrayList<String>();
+        try {
+            JSONArray inputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
+                    .getJSONArray(ADFJsonConstants.ADF_REQUEST_INPUTS);
+            for (int i = 0; i < inputs.length(); i++) {
+                tables.add(inputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
+            }
+        } catch (JSONException e) {
+            throw new FalconException("Error while reading input table names in ADF request.");
+        }
+        return tables;
+    }
+
+    protected List<String> getOutputTables() throws FalconException {
+        List<String> tables = new ArrayList<String>();
+        try {
+            JSONArray outputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
+                    .getJSONArray(ADFJsonConstants.ADF_REQUEST_OUTPUTS);
+            for (int i = 0; i < outputs.length(); i++) {
+                tables.add(outputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
+            }
+        } catch (JSONException e) {
+            throw new FalconException("Error while reading output table names in ADF request.");
+        }
+        return tables;
+    }
+
+    protected String getADFTablePath(String tableName) throws FalconException {
+        JSONObject table = tablesMap.get(tableName);
+        if (table == null) {
+            throw new FalconException("JSON object " + tableName + " not"
+                    + " found in ADF request.");
+        }
+
+        try {
+            JSONObject location = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
+                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION);
+            String requestType = location.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
+            if (requestType.equals(ADFJsonConstants.ADF_REQUEST_LOCATION_TYPE_AZURE_BLOB)) {
+                String blobPath = location.getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
+                int index = blobPath.indexOf('/');
+                if (index == -1) {
+                    throw new FalconException("Invalid azure blob path: " + blobPath);
+                }
+
+                String linkedServiceName = location.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
+                JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
+                if (linkedService == null) {
+                    throw new FalconException("Can't find linked service " + linkedServiceName + " for azure blob");
+                }
+                String connectionString = linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
+                        .getString(ADFJsonConstants.ADF_REQUEST_CONNECTION_STRING);
+                int accountNameIndex = connectionString.indexOf(ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME)
+                        + ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME.length();
+                String accountName = connectionString.substring(accountNameIndex,
+                        connectionString.indexOf(';', accountNameIndex));
+
+                StringBuilder blobUrl = new StringBuilder("wasb://")
+                        .append(blobPath.substring(0, index)).append("@")
+                        .append(accountName).append(".blob.core.windows.net")
+                        .append(blobPath.substring(index));
+                return blobUrl.toString();
+            }
+            return location.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
+                    .getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing ADF JSON message: " + tableName, e);
+        }
+    }
+
+    protected String getTableCluster(String tableName) throws FalconException {
+        JSONObject table = tablesMap.get(tableName);
+        if (table == null) {
+            throw new FalconException("Table " + tableName + " not found in ADF request.");
+        }
+
+        try {
+            String linkedServiceName = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
+                    .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION)
+                    .getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
+            return getClusterName(linkedServiceName);
+        } catch (JSONException e) {
+            throw new FalconException("Error while parsing table cluster " + tableName + " in ADF request.");
+        }
+    }
+
+    protected boolean activityHasScriptPath() throws FalconException {
+        if (JobType.REPLICATION == jobType()) {
+            return false;
+        }
+
+        return activityExtendedProperties.has(
+                ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
+    }
+
+    protected String getScriptPath() throws FalconException {
+        if (!activityHasScriptPath()) {
+            throw new FalconException("JSON object does not have object: "
+                    + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
+        }
+
+        try {
+            String scriptPath = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
+            if (StringUtils.isBlank(scriptPath)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH + " not"
+                        + " found or empty in ADF request.");
+            }
+            return scriptPath;
+        } catch (JSONException jsonException) {
+            throw new FalconException("Error while parsing ADF JSON object: "
+                    + activityExtendedProperties, jsonException);
+        }
+    }
+
+    protected String getScriptContent() throws FalconException {
+        if (activityHasScriptPath()) {
+            throw new FalconException("JSON object does not have object: " + ADFJsonConstants.ADF_REQUEST_SCRIPT);
+        }
+        try {
+            String script = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT);
+            if (StringUtils.isBlank(script)) {
+                throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT + " cannot"
+                        + " be empty in ADF request.");
+            }
+            return script;
+        } catch (JSONException jsonException) {
+            throw new FalconException("Error while parsing ADF JSON object: "
+                    + activityExtendedProperties, jsonException);
+        }
+    }
+
+    protected String getClusterNameToRunProcessOn() throws FalconException {
+        return getClusterName(getHadoopLinkedService());
+    }
+
+    protected Entity submitAndScheduleJob(String entityType, String msg) throws FalconException {
+        Entity entity = jobManager.submitJob(entityType, msg);
+        jobManager.scheduleJob(entityType, jobEntityName());
+        return entity;
+    }
+
+    private String getHadoopLinkedService() throws FalconException {
+        String hadoopLinkedService;
+        try {
+            hadoopLinkedService = activity.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
+        } catch (JSONException jsonException) {
+            throw new FalconException("Error while parsing ADF JSON object: "
+                    + activity, jsonException);
+        }
+
+        if (StringUtils.isBlank(hadoopLinkedService)) {
+            throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME
+                    + " in the activity cannot be empty in ADF request.");
+        }
+        return hadoopLinkedService;
+    }
+
+    protected void startProcess(Feed inputFeed, Feed outputFeed,
+                                String engineType, String scriptPath) throws FalconException {
+        // submit input/output feeds
+        LOG.info("submitting input feed {} for {} process", inputFeed.getName(), engineType);
+        jobManager.submitJob(EntityType.FEED.name(), inputFeed.getEntityxml());
+
+        LOG.info("submitting output feed {} for {} process", outputFeed.getName(), engineType);
+        jobManager.submitJob(EntityType.FEED.name(), outputFeed.getEntityxml());
+
+        // submit and schedule process
+        String processRequest = new Process.Builder().withProcessName(jobEntityName()).withFrequency(frequency)
+                .withStartTime(startTime).withEndTime(endTime).withClusterName(getClusterNameToRunProcessOn())
+                .withInputFeedName(inputFeed.getName()).withOutputFeedName(outputFeed.getName())
+                .withEngineType(engineType).withWFPath(scriptPath).withAclOwner(proxyUser)
+                .build().getEntityxml();
+
+        LOG.info("submitting/scheduling {} process: {}", engineType, processRequest);
+        submitAndScheduleJob(EntityType.PROCESS.name(), processRequest);
+        LOG.info("submitted and scheduled {} process: {}", engineType, jobEntityName());
+    }
+
+    protected void cleanupProcess(Feed inputFeed, Feed outputFeed) throws FalconException {
+        // delete the entities. Should be called after the job execution success/failure.
+        jobManager.deleteEntity(EntityType.PROCESS.name(), jobEntityName());
+        jobManager.deleteEntity(EntityType.FEED.name(), inputFeed.getName());
+        jobManager.deleteEntity(EntityType.FEED.name(), outputFeed.getName());
+
+        // delete script file
+        FSUtils.removeDir(new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName()));
+    }
+
+    protected String createScriptFile(String fileExt) throws FalconException {
+        String content = getScriptContent();
+
+        // create dir; dir path is unique as job name is always unique
+        final Path dir = new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName());
+        FSUtils.createDir(dir);
+
+        // create script file
+        final Path path = new Path(dir, jobEntityName() + fileExt);
+        return FSUtils.createFile(path, content);
+    }
+
+    private static long parseADFRequestTimeout(String timeout) throws FalconException {
+        timeout = timeout.trim();
+        //  [ws][-]{ d | d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]] }[ws]
+        if (timeout.startsWith("-")) {
+            return -1;
+        }
+
+        long totalMinutes = 0;
+        String [] dotParts = timeout.split(Pattern.quote("."));
+        if (dotParts.length == 1) {
+            // no d or ff
+            // chk if only d
+            // Formats can be d|hh:mm[:ss]
+            String[] parts = timeout.split(":");
+            if (parts.length == 1) {
+                // only day. Convert days to minutes
+                return Integer.parseInt(parts[0]) * 1440;
+            } else {
+                // hh:mm[:ss]
+                return computeMinutes(parts);
+            }
+        }
+
+        // if . is present, formats can be d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]]
+        if (dotParts.length == 2) {
+            // can be d.hh:mm[:ss] or hh:mm[:ss[.ff]
+            // check if first part has colons
+            String [] parts = dotParts[0].split(":");
+            if (parts.length == 1) {
+                // format is d.hh:mm[:ss]
+                totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
+                parts = dotParts[1].split(":");
+                totalMinutes += computeMinutes(parts);
+                return totalMinutes;
+            } else {
+                // format is hh:mm[:ss[.ff]
+                parts = dotParts[0].split(":");
+                totalMinutes += computeMinutes(parts);
+                // round off ff
+                totalMinutes +=  1;
+                return totalMinutes;
+            }
+        } else if (dotParts.length == 3) {
+            // will be d.hh:mm[:ss[.ff]
+            totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
+            String [] parts = dotParts[1].split(":");
+            totalMinutes += computeMinutes(parts);
+            // round off ff
+            totalMinutes +=  1;
+            return totalMinutes;
+        } else {
+            throw new FalconException("Error parsing policy timeout: " + timeout);
+        }
+    }
+
+    // format hh:mm[:ss]
+    private static long computeMinutes(String[] parts) {
+        // hh:mm[:ss]
+        int totalMinutes = Integer.parseInt(parts[0]) * 60;
+        totalMinutes += Integer.parseInt(parts[1]);
+        if (parts.length == 3) {
+            // Second round off to minutes
+            totalMinutes +=  1;
+        }
+        return totalMinutes;
+    }
+
+    private static String transformTimeFormat(String adfTime) {
+        return adfTime.substring(0, adfTime.length()-4) + "Z";
+    }
+
+    protected class ADFJobManager extends AbstractSchedulableEntityManager {
+        public Entity submitJob(String entityType, String msg) throws FalconException {
+            try {
+                InputStream stream = IOUtils.toInputStream(msg);
+                Entity entity = submitInternal(stream, entityType, proxyUser);
+                return entity;
+            } catch (Exception e) {
+                LOG.info(e.toString());
+                throw new FalconException("Error when submitting job: " + e.toString());
+            }
+        }
+
+        public void scheduleJob(String entityType, String entityName) throws FalconException {
+            try {
+                scheduleInternal(entityType, entityName, null, EntityUtil.getPropertyMap(null));
+            } catch (Exception e) {
+                LOG.info(e.toString());
+                throw new FalconException("Error when scheduling job: " + e.toString());
+            }
+        }
+
+        public void deleteEntity(String entityType, String entityName) throws FalconException {
+            delete(entityType, entityName, null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
new file mode 100644
index 0000000..ceea6a4
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Azure ADB Job factory to generate ADFJob for each job type.
+ */
+public final class ADFJobFactory {
+    public static ADFJob buildADFJob(String msg, String id) throws FalconException {
+        ADFJob.JobType jobType = ADFJob.getJobType(msg);
+        switch (jobType) {
+        case REPLICATION:
+            return new ADFReplicationJob(msg, id);
+        case HIVE:
+            return new ADFHiveJob(msg, id);
+        case PIG:
+            return new ADFPigJob(msg, id);
+        default:
+            throw new FalconException("Invalid job type: " + jobType.toString());
+        }
+    }
+
+    private ADFJobFactory() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
new file mode 100644
index 0000000..041eb48
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Azure ADF Pig Job.
+ */
+public class ADFPigJob extends ADFJob {
+    private static final String PIG_SCRIPT_EXTENSION = ".pig";
+    private static final String ENGINE_TYPE = "pig";
+    private static final String INPUT_FEED_SUFFIX = "-pig-input-feed";
+    private static final String OUTPUT_FEED_SUFFIX = "-pig-output-feed";
+
+    private String pigScriptPath;
+    private DataFeed inputDataFeed;
+    private DataFeed outputDataFeed;
+
+    public ADFPigJob(String message, String id) throws FalconException {
+        super(message, id);
+        type = JobType.PIG;
+        inputDataFeed = getInputFeed();
+        outputDataFeed = getOutputFeed();
+        pigScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(PIG_SCRIPT_EXTENSION);
+    }
+
+    @Override
+    public void startJob() throws FalconException {
+        startProcess(inputDataFeed, outputDataFeed, ENGINE_TYPE, pigScriptPath);
+    }
+
+    @Override
+    public void cleanup() throws FalconException {
+        cleanupProcess(inputDataFeed, outputDataFeed);
+    }
+
+    private DataFeed getInputFeed() throws FalconException {
+        return getFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0),
+                getTableCluster(getInputTables().get(0)));
+    }
+
+    private DataFeed getOutputFeed() throws FalconException {
+        return getFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0),
+                getTableCluster(getOutputTables().get(0)));
+    }
+
+    private DataFeed getFeed(final String feedName, final String tableName,
+                             final String clusterName) throws FalconException {
+        return new DataFeed.Builder().withFeedName(feedName).withFrequency(frequency)
+                .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime)
+                .withAclOwner(proxyUser).withLocationPath(getADFTablePath(tableName)).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
new file mode 100644
index 0000000..3438b2f
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import com.microsoft.windowsazure.Configuration;
+import com.microsoft.windowsazure.exception.ServiceException;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
+import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.adfservice.util.ADFJsonConstants;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.resource.AbstractInstanceManager;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Falcon ADF provider to handle requests from Azure Data Factory.
+ */
+public class ADFProviderService implements FalconService, WorkflowExecutionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ADFProviderService.class);
+
+    /**
+     * Constant for the service name.
+     */
+    public static final String SERVICE_NAME = ADFProviderService.class.getSimpleName();
+
+    private static final int AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT = 60;
+    // polling frequency in seconds
+    private static final int AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY = 10;
+
+    // Number of threads to handle ADF requests
+    private static final int AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS = 5;
+
+    private static final String AZURE_SERVICEBUS_CONF_PREFIX = "microsoft.windowsazure.services.servicebus.";
+    private static final String AZURE_SERVICEBUS_CONF_SASKEYNAME = "sasKeyName";
+    private static final String AZURE_SERVICEBUS_CONF_SASKEY = "sasKey";
+    private static final String AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI = "serviceBusRootUri";
+    private static final String AZURE_SERVICEBUS_CONF_NAMESPACE = "namespace";
+    private static final String AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY = "polling.frequency";
+    private static final String AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME = "requestqueuename";
+    private static final String AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME = "statusqueuename";
+    private static final String AZURE_SERVICEBUS_CONF_SUPER_USER = "superuser";
+
+    private static final ConfigurationStore STORE = ConfigurationStore.get();
+
+    private ServiceBusContract service;
+    private ScheduledExecutorService adfScheduledExecutorService;
+    private ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
+    private ADFInstanceManager instanceManager = new ADFInstanceManager();
+    private String requestQueueName;
+    private String statusQueueName;
+    private String superUser;
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        // read start up properties for adf configuration
+        service = ServiceBusService.create(getServiceBusConfig());
+
+        requestQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME);
+        if (StringUtils.isBlank(requestQueueName)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME
+                    + " property not set in startup properties. Please add it.");
+        }
+        statusQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME);
+        if (StringUtils.isBlank(statusQueueName)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME
+                    + " property not set in startup properties. Please add it.");
+        }
+
+        // init opts
+        opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
+        opts.setTimeout(AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT);
+
+        // restart handling
+        superUser = StartupProperties.get().getProperty(
+                AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER);
+        if (StringUtils.isBlank(superUser)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER
+                    + " property not set in startup properties. Please add it.");
+        }
+        CurrentUser.authenticate(superUser);
+        for (EntityType entityType : EntityType.values()) {
+            Collection<String> entities = STORE.getEntities(entityType);
+            for (String entityName : entities) {
+                updateJobStatus(entityName, entityType.toString());
+            }
+        }
+
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+        adfScheduledExecutorService = new ADFScheduledExecutor(AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS);
+        adfScheduledExecutorService.scheduleWithFixedDelay(new HandleADFRequests(), 0, getDelay(), TimeUnit.SECONDS);
+        LOG.info("Falcon ADFProvider service initialized");
+    }
+
+    private class HandleADFRequests implements Runnable {
+
+        @Override
+        public void run() {
+            String sessionID = null;
+            try {
+                LOG.info("To read message from adf...");
+                ReceiveQueueMessageResult resultQM =
+                        service.receiveQueueMessage(requestQueueName, opts);
+                BrokeredMessage message = resultQM.getValue();
+                if (message != null && message.getMessageId() != null) {
+                    sessionID = message.getReplyToSessionId();
+                    BufferedReader rd = new BufferedReader(
+                            new InputStreamReader(message.getBody()));
+                    StringBuilder sb = new StringBuilder();
+                    String line;
+                    while ((line = rd.readLine()) != null) {
+                        sb.append(line);
+                    }
+                    rd.close();
+                    String msg = sb.toString();
+                    LOG.info("ADF message: " + msg);
+
+                    service.deleteMessage(message);
+
+                    ADFJob job = ADFJobFactory.buildADFJob(msg, sessionID);
+                    job.startJob();
+                } else {
+                    LOG.info("No message from adf");
+                }
+            } catch (FalconException e) {
+                if (sessionID != null) {
+                    sendErrorMessage(sessionID, e.toString());
+                }
+                LOG.info(e.toString());
+            } catch (ServiceException | IOException e) {
+                LOG.info(e.toString());
+            }
+        }
+    }
+
+    private static Configuration getServiceBusConfig() throws FalconException {
+        String namespace = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_NAMESPACE);
+        if (StringUtils.isBlank(namespace)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_NAMESPACE
+                    + " property not set in startup properties. Please add it.");
+        }
+
+        String sasKeyName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_SASKEYNAME);
+        if (StringUtils.isBlank(sasKeyName)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEYNAME
+                    + " property not set in startup properties. Please add it.");
+        }
+
+        String sasKey = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_SASKEY);
+        if (StringUtils.isBlank(sasKey)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEY
+                    + " property not set in startup properties. Please add it.");
+        }
+
+        String serviceBusRootUri = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI);
+        if (StringUtils.isBlank(serviceBusRootUri)) {
+            throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI
+                    + " property not set in startup properties. Please add it.");
+        }
+
+        LOG.info("namespace: {}, sas key name: {}, sas key: {}, root uri: {}",
+                        namespace, sasKeyName, sasKey, serviceBusRootUri);
+        return ServiceBusConfiguration.configureWithSASAuthentication(namespace, sasKeyName, sasKey,
+                serviceBusRootUri);
+    }
+
+
+    // gets delay in seconds
+    private long getDelay() throws FalconException {
+        String pollingFrequencyValue = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX
+                + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY);
+        long pollingFrequency;
+        try {
+            pollingFrequency = (StringUtils.isNotEmpty(pollingFrequencyValue))
+                    ? Long.parseLong(pollingFrequencyValue) : AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY;
+        } catch (NumberFormatException nfe) {
+            throw new FalconException("Invalid value provided for startup property "
+                    + AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY
+                    + ", please provide a valid long number", nfe);
+        }
+        return pollingFrequency;
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this);
+        adfScheduledExecutorService.shutdown();
+    }
+
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_SUCCEEDED, 100);
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_FAILED, 0);
+    }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0);
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_CANCELED, 0);
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0);
+    }
+
+    private void updateJobStatus(String entityName, String entityType) throws FalconException {
+        // Filter non-adf jobs
+        if (!ADFJob.isADFJobEntity(entityName)) {
+            return;
+        }
+
+        Instance instance = instanceManager.getFirstInstance(entityName, entityType);
+        if (instance == null) {
+            return;
+        }
+
+        WorkflowStatus workflowStatus = instance.getStatus();
+        String status;
+        int progress = 0;
+        switch (workflowStatus) {
+        case SUCCEEDED:
+            progress = 100;
+            status = ADFJsonConstants.ADF_STATUS_SUCCEEDED;
+            break;
+        case FAILED:
+        case KILLED:
+        case ERROR:
+        case SKIPPED:
+        case UNDEFINED:
+            status = ADFJsonConstants.ADF_STATUS_FAILED;
+            break;
+        default:
+            status = ADFJsonConstants.ADF_STATUS_EXECUTING;
+        }
+        updateJobStatus(entityName, status, progress, instance.getLogFile());
+    }
+
+    private void updateJobStatus(WorkflowExecutionContext context, String status, int progress) {
+        // Filter non-adf jobs
+        String entityName = context.getEntityName();
+        if (!ADFJob.isADFJobEntity(entityName)) {
+            return;
+        }
+
+        updateJobStatus(entityName, status, progress, context.getLogFile());
+    }
+
+    private void updateJobStatus(String entityName, String status, int progress, String logUrl) {
+        try {
+            String sessionID = ADFJob.getSessionID(entityName);
+            LOG.info("To update job status: " + sessionID + ", " + entityName + ", " + status + ", " + logUrl);
+            JSONObject obj = new JSONObject();
+            obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME);
+            obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID);
+            obj.put(ADFJsonConstants.ADF_STATUS_LOG_URL, logUrl);
+            obj.put(ADFJsonConstants.ADF_STATUS_STATUS, status);
+            obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, progress);
+            sendStatusUpdate(sessionID, obj.toString());
+        } catch (JSONException | FalconException e) {
+            LOG.info("Error when updating job status: " + e.toString());
+        }
+    }
+
+    private void sendErrorMessage(String sessionID, String errorMessage) {
+        LOG.info("Sending error message for session " + sessionID + ": " + errorMessage);
+        try {
+            JSONObject obj = new JSONObject();
+            obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME);
+            obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID);
+            obj.put(ADFJsonConstants.ADF_STATUS_STATUS, ADFJsonConstants.ADF_STATUS_FAILED);
+            obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, 0);
+            obj.put(ADFJsonConstants.ADF_STATUS_ERROR_TYPE, ADFJsonConstants.ADF_STATUS_ERROR_TYPE_VALUE);
+            obj.put(ADFJsonConstants.ADF_STATUS_ERROR_MESSAGE, errorMessage);
+            sendStatusUpdate(sessionID, obj.toString());
+        } catch (JSONException e) {
+            LOG.info("Error when sending error message: " + e.toString());
+        }
+    }
+
+    private void sendStatusUpdate(String sessionID, String message) {
+        LOG.info("Sending update for session " + sessionID + ": " + message);
+        try {
+            InputStream in = IOUtils.toInputStream(message, "UTF-8");
+            BrokeredMessage updateMessage = new BrokeredMessage(in);
+            updateMessage.setSessionId(sessionID);
+            service.sendQueueMessage(statusQueueName, updateMessage);
+        } catch (IOException | ServiceException e) {
+            LOG.info("Error when sending status update: " + e.toString());
+        }
+    }
+
+    private static class ADFInstanceManager extends AbstractInstanceManager {
+        public Instance getFirstInstance(String entityName, String entityType) throws FalconException {
+            InstancesResult result = getStatus(entityType, entityName, null, null, null, null, "", "", "", 0, 1, null);
+            Instance[] instances = result.getInstances();
+            if (instances.length > 0) {
+                return instances[0];
+            }
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
new file mode 100644
index 0000000..f847a82
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import java.net.URISyntaxException;
+
+import org.apache.falcon.adfservice.util.FSUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Azure ADF Replication Job (hive/hdfs to Azure blobs).
+ */
+public class ADFReplicationJob extends ADFJob {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ADFReplicationJob.class);
+
+    public static final String TEMPLATE_REPLICATION_FEED = "replicate-feed.xml";
+    public static final String REPLICATION_TARGET_CLUSTER = "adf-replication-target-cluster";
+
+    public ADFReplicationJob(String message, String id) throws FalconException {
+        super(message, id);
+        type = JobType.REPLICATION;
+    }
+
+    @Override
+    public void startJob() throws FalconException {
+        try {
+            // Note: in first clickstop, we support only one input table and one output table for replication job
+            String inputTableName = getInputTables().get(0);
+            String outputTableName = getOutputTables().get(0);
+            String template = FSUtils.readHDFSFile(TEMPLATE_PATH_PREFIX, TEMPLATE_REPLICATION_FEED);
+            String message = template.replace("$feedName$", jobEntityName())
+                    .replace("$frequency$", frequency)
+                    .replace("$startTime$", startTime)
+                    .replace("$endTime$", endTime)
+                    .replace("$clusterSource$", getTableCluster(inputTableName))
+                    .replace("$clusterTarget$", REPLICATION_TARGET_CLUSTER)
+                    .replace("$sourceLocation$", getADFTablePath(inputTableName))
+                    .replace("$targetLocation$", getADFTablePath(outputTableName));
+            submitAndScheduleJob(EntityType.FEED.name(), message);
+        } catch (URISyntaxException e) {
+            LOG.info(e.toString());
+        }
+
+    }
+
+    @Override
+    public void cleanup() throws FalconException {
+        // Delete the entities. Should be called after the job execution success/failure.
+        jobManager.deleteEntity(EntityType.FEED.name(), jobEntityName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
new file mode 100644
index 0000000..df5a993
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ADF thread pool executor.
+ */
+public class ADFScheduledExecutor extends ScheduledThreadPoolExecutor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ADFScheduledExecutor.class);
+
+    public ADFScheduledExecutor(int corePoolSize) {
+        super(corePoolSize);
+    }
+
+    @Override
+    public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        return super.scheduleAtFixedRate(wrapRunnable(command), initialDelay, period, unit);
+    }
+
+    @Override
+    public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        return super.scheduleWithFixedDelay(wrapRunnable(command), initialDelay, delay, unit);
+    }
+
+    private Runnable wrapRunnable(Runnable command) {
+        return new LogOnExceptionRunnable(command);
+    }
+
+    private static class LogOnExceptionRunnable implements Runnable {
+        private Runnable runnable;
+
+        public LogOnExceptionRunnable(Runnable runnable) {
+            super();
+            this.runnable = runnable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                LOG.info("Error while executing: {}", t.getMessage());
+                throw new RuntimeException(t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
new file mode 100644
index 0000000..32d2757
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.adfservice.util.FSUtils;
+import org.apache.falcon.FalconException;
+
+import java.net.URISyntaxException;
+
+/**
+ * Class for data Feed.
+ */
+public class DataFeed extends Feed {
+    private static final String FEED_TEMPLATE_FILE = "feed.xml";
+    private String locationPath;
+
+    public DataFeed(final Builder builder) {
+        this.feedName = builder.name;
+        this.clusterName = builder.feedClusterName;
+        this.frequency = builder.feedFrequency;
+        this.startTime = builder.feedStartTime;
+        this.endTime = builder.feedEndTime;
+        this.locationPath = builder.feedLocationPath;
+        this.aclOwner = builder.feedAclOwner;
+    }
+
+    @Override
+    public String getEntityxml() throws FalconException {
+        try {
+            String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, FEED_TEMPLATE_FILE);
+            return template.replace("$feedName$", feedName)
+                    .replace("$frequency$", frequency)
+                    .replace("$startTime$", startTime)
+                    .replace("$endTime$", endTime)
+                    .replace("$cluster$", clusterName)
+                    .replace("$location$", locationPath)
+                    .replace("$aclowner$", aclOwner);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Error when generating entity xml for table feed", e);
+        }
+    }
+
+    /**
+     * Builder for table Feed.
+     */
+    public static class Builder {
+        private String name;
+        private String feedClusterName;
+        private String feedFrequency;
+        private String feedStartTime;
+        private String feedEndTime;
+        private String feedLocationPath;
+        private String feedAclOwner;
+
+        public DataFeed build() {
+            return new DataFeed(this);
+        }
+
+        public Builder withFeedName(final String feedName) {
+            this.name = feedName;
+            return this;
+        }
+
+        public Builder withClusterName(final String clusterName) {
+            this.feedClusterName = clusterName;
+            return this;
+        }
+
+        public Builder withFrequency(final String frequency) {
+            this.feedFrequency = frequency;
+            return this;
+        }
+
+        public Builder withStartTime(final String startTime) {
+            this.feedStartTime = startTime;
+            return this;
+        }
+
+        public Builder withEndTime(final String endTime) {
+            this.feedEndTime = endTime;
+            return this;
+        }
+
+        public Builder withLocationPath(final String locationPath) {
+            this.feedLocationPath = locationPath;
+            return this;
+        }
+
+        public Builder withAclOwner(final String aclOwner) {
+            this.feedAclOwner = aclOwner;
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
new file mode 100644
index 0000000..d05f300
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Abstract class for feed.
+ */
+public abstract class Feed {
+    protected String feedName;
+    protected String clusterName;
+    protected String frequency;
+    protected String startTime;
+    protected String endTime;
+    protected String aclOwner;
+
+    public String getName() {
+        return feedName;
+    }
+
+    public abstract String getEntityxml() throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
new file mode 100644
index 0000000..3a65753
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.adfservice.util.FSUtils;
+import org.apache.falcon.FalconException;
+
+import java.net.URISyntaxException;
+
+/**
+ * Class for process.
+ */
+public class Process {
+    private static final String PROCESS_TEMPLATE_FILE = "process.xml";
+
+    private String entityName;
+    private String frequency;
+    private String startTime;
+    private String endTime;
+    private String clusterName;
+    private String inputFeedName;
+    private String outputFeedName;
+    private String engineType;
+    private String wfPath;
+    private String aclOwner;
+
+    public Process(final Builder builder) {
+        this.entityName = builder.name;
+        this.clusterName = builder.processClusterName;
+        this.frequency = builder.processFrequency;
+        this.startTime = builder.processStartTime;
+        this.endTime = builder.processEndTime;
+        this.inputFeedName = builder.processInputFeedName;
+        this.outputFeedName = builder.processOutputFeedName;
+        this.engineType = builder.processEngineType;
+        this.wfPath = builder.processWfPath;
+        this.aclOwner = builder.processAclOwner;
+    }
+
+    public String getName() {
+        return entityName;
+    }
+
+    public String getEntityxml() throws FalconException {
+        try {
+            String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, PROCESS_TEMPLATE_FILE);
+            return template.replace("$processName$", entityName)
+                    .replace("$frequency$", frequency)
+                    .replace("$startTime$", startTime)
+                    .replace("$endTime$", endTime)
+                    .replace("$clusterName$", clusterName)
+                    .replace("$inputFeedName$", inputFeedName)
+                    .replace("$outputFeedName$", outputFeedName)
+                    .replace("$engine$", engineType)
+                    .replace("$scriptPath$", wfPath)
+                    .replace("$aclowner$", aclOwner);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Error when generating process xml", e);
+        }
+    }
+
+    /**
+     * Builder for process.
+     */
+    public static class Builder {
+        private String name;
+        private String processClusterName;
+        private String processFrequency;
+        private String processStartTime;
+        private String processEndTime;
+        private String processInputFeedName;
+        private String processOutputFeedName;
+        private String processEngineType;
+        private String processWfPath;
+        private String processAclOwner;
+
+        public Process build() {
+            return new Process(this);
+        }
+
+        public Builder withProcessName(final String processName) {
+            this.name = processName;
+            return this;
+        }
+
+        public Builder withClusterName(final String clusterName) {
+            this.processClusterName = clusterName;
+            return this;
+        }
+
+        public Builder withFrequency(final String frequency) {
+            this.processFrequency = frequency;
+            return this;
+        }
+
+        public Builder withStartTime(final String startTime) {
+            this.processStartTime = startTime;
+            return this;
+        }
+
+        public Builder withEndTime(final String endTime) {
+            this.processEndTime = endTime;
+            return this;
+        }
+
+        public Builder withInputFeedName(final String inputFeedName) {
+            this.processInputFeedName = inputFeedName;
+            return this;
+        }
+
+        public Builder withOutputFeedName(final String outputFeedName) {
+            this.processOutputFeedName = outputFeedName;
+            return this;
+        }
+
+        public Builder withAclOwner(final String aclOwner) {
+            this.processAclOwner = aclOwner;
+            return this;
+        }
+
+        public Builder withEngineType(final String engineType) {
+            this.processEngineType = engineType;
+            return this;
+        }
+
+        public Builder withWFPath(final String wfPath) {
+            this.processWfPath = wfPath;
+            return this;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/ad18b024/addons/adf/src/main/java/org/apache/falcon/adfservice/TableFeed.java
----------------------------------------------------------------------
diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/TableFeed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/TableFeed.java
new file mode 100644
index 0000000..a3e11ef
--- /dev/null
+++ b/addons/adf/src/main/java/org/apache/falcon/adfservice/TableFeed.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.adfservice;
+
+import org.apache.falcon.adfservice.util.FSUtils;
+import org.apache.falcon.FalconException;
+
+import java.net.URISyntaxException;
+
+/**
+ * Table Feed.
+ */
+public class TableFeed extends Feed {
+    private static final String TABLE_FEED_TEMPLATE_FILE = "table-feed.xml";
+    private static final String TABLE_PARTITION_SEPARATOR = "#";
+
+    private String tableName;
+    private String partitions;
+
+    public TableFeed(final Builder builder) {
+        this.feedName = builder.tableFeedName;
+        this.clusterName = builder.feedClusterName;
+        this.frequency = builder.feedFrequency;
+        this.startTime = builder.feedStartTime;
+        this.endTime = builder.feedEndTime;
+        this.tableName = builder.feedTableName;
+        this.aclOwner = builder.feedAclOwner;
+        this.partitions = builder.feedPartitions;
+    }
+
+    private String getTable() {
+        return tableName + TABLE_PARTITION_SEPARATOR + partitions;
+    }
+
+    @Override
+    public String getEntityxml() throws FalconException {
+        try {
+            String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, TABLE_FEED_TEMPLATE_FILE);
+            return template.replace("$feedName$", feedName)
+                    .replace("$frequency$", frequency)
+                    .replace("$startTime$", startTime)
+                    .replace("$endTime$", endTime)
+                    .replace("$cluster$", clusterName)
+                    .replace("$table$", getTable())
+                    .replace("$aclowner$", aclOwner);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Error when generating entity xml for table feed", e);
+        }
+    }
+
+    /**
+     * Builder for table Feed.
+     */
+    public static class Builder {
+        private String tableFeedName;
+        private String feedClusterName;
+        private String feedFrequency;
+        private String feedStartTime;
+        private String feedEndTime;
+        private String feedTableName;
+        private String feedAclOwner;
+        private String feedPartitions;
+
+        public TableFeed build() {
+            return new TableFeed(this);
+        }
+
+        public Builder withFeedName(final String feedName) {
+            this.tableFeedName = feedName;
+            return this;
+        }
+
+        public Builder withClusterName(final String clusterName) {
+            this.feedClusterName = clusterName;
+            return this;
+        }
+
+        public Builder withFrequency(final String frequency) {
+            this.feedFrequency = frequency;
+            return this;
+        }
+
+        public Builder withStartTime(final String startTime) {
+            this.feedStartTime = startTime;
+            return this;
+        }
+
+        public Builder withEndTime(final String endTime) {
+            this.feedEndTime = endTime;
+            return this;
+        }
+
+        public Builder withTableName(final String tableName) {
+            this.feedTableName = tableName;
+            return this;
+        }
+
+        public Builder withAclOwner(final String aclOwner) {
+            this.feedAclOwner = aclOwner;
+            return this;
+        }
+
+        public Builder withPartitions(final String partitions) {
+            this.feedPartitions = partitions;
+            return this;
+        }
+    }
+
+}


Mime
View raw message