falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/2] falcon git commit: FALCON-1861 Support HDFS Snapshot based replication in Falcon
Date Thu, 21 Apr 2016 23:48:22 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 7c0481eac -> aba79aae2


http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
index 1457b06..29fcdb9 100644
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
@@ -40,11 +40,14 @@ public final class EvictionHelper {
     private EvictionHelper(){}
 
     public static Pair<Date, Date> getDateRange(String period) throws ELException {
-        Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
-                Long.class, RESOLVER, RESOLVER);
+        Long duration = evalExpressionToMilliSeconds(period);
         Date end = new Date();
         Date start = new Date(end.getTime() - duration);
         return Pair.of(start, end);
     }
 
+    public static Long evalExpressionToMilliSeconds(String period) throws ELException {
+        return (Long) EVALUATOR.evaluate("${" + period + "}", Long.class, RESOLVER, RESOLVER);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index b7bac73..7a850f8 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -73,7 +73,7 @@
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
new file mode 100644
index 0000000..ce40068
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.retention;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for EvictionHelper.
+ */
+public class EvictionHelperTest {
+    @Test
+    public void testEvictionHelper() throws Exception {
+        Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(3)").longValue(), 259200000);
+        Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(1)").longValue(), 86400000);
+        Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("hours(5)").longValue(), 18000000);
+        Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(5)").longValue(), 300000);
+        Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(1)").longValue(), 60000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 6a0725a..eb2faea 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -43,6 +43,30 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-client</artifactId>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <classifier>tests</classifier>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <classifier>tests</classifier>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>
@@ -77,6 +101,11 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-test-util</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
index 11b3725..24bbb87 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
@@ -20,6 +20,7 @@ package org.apache.falcon.extensions;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
 import org.apache.falcon.extensions.mirroring.hive.HiveMirroringExtension;
 
 import java.util.ArrayList;
@@ -33,12 +34,14 @@ import java.util.Properties;
 public abstract class AbstractExtension {
     private static final List<String> TRUSTED_EXTENSIONS = Arrays.asList(
             new HdfsMirroringExtension().getName().toUpperCase(),
+            new HdfsSnapshotMirroringExtension().getName().toUpperCase(),
             new HiveMirroringExtension().getName().toUpperCase());
     private static List<AbstractExtension> extensions = new ArrayList<>();
 
     public static List<AbstractExtension> getExtensions() {
         if (extensions.isEmpty()) {
             extensions.add(new HdfsMirroringExtension());
+            extensions.add(new HdfsSnapshotMirroringExtension());
             extensions.add(new HiveMirroringExtension());
         }
         return extensions;

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
new file mode 100644
index 0000000..f179896
--- /dev/null
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hdfsSnapshot;
+
+/**
+ * Hdfs Snapshot Extension properties.
+ */
+public enum HdfsSnapshotMirrorProperties {
+    SOURCE_CLUSTER("sourceCluster", "Snapshot replication source cluster", true),
+    SOURCE_NN("sourceNN", "Snapshot replication source cluster namenode", false),
+    SOURCE_EXEC_URL("sourceExecUrl", "Snapshot replication source execute endpoint", false),
+    SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal",
+            "Snapshot replication source kerberos principal", false),
+
+    SOURCE_SNAPSHOT_DIR("sourceSnapshotDir", "Location of source snapshot path", true),
+    SOURCE_SNAPSHOT_RETENTION_POLICY("sourceSnapshotRetentionPolicy", "Retention policy for source snapshots", false),
+    SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT("sourceSnapshotRetentionAgeLimit",
+            "Delete source snapshots older than this age", true),
+    SOURCE_SNAPSHOT_RETENTION_NUMBER("sourceSnapshotRetentionNumber",
+            "Number of latest source snapshots to retain on source", true),
+
+    TARGET_CLUSTER("targetCluster", "Snapshot replication target cluster", true),
+    TARGET_NN("targetNN", "Snapshot replication target cluster namenode", false),
+    TARGET_EXEC_URL("targetExecUrl", "Snapshot replication target execute endpoint", false),
+    TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal",
+            "Snapshot replication target kerberos principal", false),
+
+    TARGET_SNAPSHOT_DIR("targetSnapshotDir", "Target Hive metastore uri", true),
+    TARGET_SNAPSHOT_RETENTION_POLICY("targetSnapshotRetentionPolicy", "Retention policy for target snapshots", false),
+    TARGET_SNAPSHOT_RETENTION_AGE_LIMIT("targetSnapshotRetentionAgeLimit",
+            "Delete target snapshots older than this age", true),
+    TARGET_SNAPSHOT_RETENTION_NUMBER("targetSnapshotRetentionNumber",
+            "Number of latest target snapshots to retain on source", true),
+
+    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false),
+    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false),
+
+    TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Is TDE encryption enabled on source and target", false),
+    SNAPSHOT_JOB_NAME("snapshotJobName", "Name of snapshot based mirror job", false);
+
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HdfsSnapshotMirrorProperties(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
new file mode 100644
index 0000000..09cce3b
--- /dev/null
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hdfsSnapshot;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Hdfs snapshot mirroring extension.
+ */
+public class HdfsSnapshotMirroringExtension extends AbstractExtension {
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotMirroringExtension.class);
+    private static final String EXTENSION_NAME = "HDFS-SNAPSHOT-MIRRORING";
+    private static final String DEFAULT_RETENTION_POLICY = "delete";
+    public static final String EMPTY_KERBEROS_PRINCIPAL = "NA";
+
+    @Override
+    public String getName() {
+        return EXTENSION_NAME;
+    }
+
+    @Override
+    public void validate(final Properties extensionProperties) throws FalconException {
+        for (HdfsSnapshotMirrorProperties option : HdfsSnapshotMirrorProperties.values()) {
+            if (extensionProperties.getProperty(option.getName()) == null && option.isRequired()) {
+                throw new FalconException("Missing extension property: " + option.getName());
+            }
+        }
+
+        Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()));
+        if (sourceCluster == null) {
+            throw new FalconException("SourceCluster entity "
+                    + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found");
+        }
+        Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName()));
+        if (targetCluster == null) {
+            throw new FalconException("TargetCluster entity "
+                    + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found");
+        }
+
+        Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster);
+        Configuration targetConf = ClusterHelper.getConfiguration(targetCluster);
+        DistributedFileSystem sourceFileSystem =
+                HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+        DistributedFileSystem targetFileSystem =
+                HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+
+        Path sourcePath = new Path(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()));
+        Path targetPath = new Path(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()));
+
+        // check if source and target path's exist and are snapshot-able
+        try {
+            if (sourceFileSystem.exists(sourcePath)) {
+                if (!isDirSnapshotable(sourceFileSystem, sourcePath)) {
+                    throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()
+                            + " " + sourcePath.toString() + " does not allow snapshots.");
+                }
+            } else {
+                throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()
+                        + " " + sourcePath.toString() + " does not exist.");
+            }
+            if (targetFileSystem.exists(targetPath)) {
+                if (!isDirSnapshotable(targetFileSystem, targetPath)) {
+                    throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()
+                            + " " + targetPath.toString() + " does not allow snapshots.");
+                }
+            } else {
+                throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()
+                        + " " + targetPath.toString() + " does not exist.");
+            }
+        } catch (IOException e) {
+            throw new FalconException(e.getMessage(), e);
+        }
+
+
+    }
+
+    private static boolean isDirSnapshotable(DistributedFileSystem hdfs, Path path) throws FalconException {
+        try {
+            LOG.debug("HDFS Snapshot extension validating if dir {} is snapshotable.", path.toString());
+            SnapshottableDirectoryStatus[] snapshotableDirs = hdfs.getSnapshottableDirListing();
+            if (snapshotableDirs != null && snapshotableDirs.length > 0) {
+                for (SnapshottableDirectoryStatus dir : snapshotableDirs) {
+                    if (dir.getFullPath().toString().equals(path.toString())) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (IOException e) {
+            LOG.error("Unable to verify if dir {} is snapshot-able. {}", path.toString(), e.getMessage());
+            throw new FalconException("Unable to verify if dir " + path.toString() + " is snapshot-able", e);
+        }
+    }
+
+    @Override
+    public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException {
+        Properties additionalProperties = new Properties();
+
+        // Add default properties if not passed
+        String distcpMaxMaps = extensionProperties.getProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName());
+        if (StringUtils.isBlank(distcpMaxMaps)) {
+            additionalProperties.put(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1");
+        }
+
+        String distcpMapBandwidth = extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName());
+        if (StringUtils.isBlank(distcpMapBandwidth)) {
+            additionalProperties.put(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100");
+        }
+
+        String tdeEnabled = extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName());
+        if (StringUtils.isNotBlank(tdeEnabled) && Boolean.parseBoolean(tdeEnabled)) {
+            additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "true");
+        } else {
+            additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false");
+        }
+
+        // Add sourceCluster properties
+        Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()));
+        if (sourceCluster == null) {
+            LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
+            throw new FalconException("Cluster entity "
+                    + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found");
+        }
+        additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(),
+                ClusterHelper.getStorageUrl(sourceCluster));
+        additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+                ClusterHelper.getMREndPoint(sourceCluster));
+        String sourceKerberosPrincipal = ClusterHelper.getPropertyValue(sourceCluster, SecurityUtil.NN_PRINCIPAL);
+        if (StringUtils.isBlank(sourceKerberosPrincipal)) {
+            sourceKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL;
+        }
+        additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+                sourceKerberosPrincipal);
+
+        String sourceRetentionPolicy = extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName());
+        if (StringUtils.isBlank(sourceRetentionPolicy)) {
+            sourceRetentionPolicy = DEFAULT_RETENTION_POLICY;
+        }
+        validateRetentionPolicy(sourceRetentionPolicy);
+        additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+                sourceRetentionPolicy);
+
+        // Add targetCluster properties
+        Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName()));
+        if (targetCluster == null) {
+            LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName());
+            throw new FalconException("Cluster entity "
+                    + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found");
+        }
+        additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
+                ClusterHelper.getStorageUrl(targetCluster));
+        additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+                ClusterHelper.getMREndPoint(targetCluster));
+        String targetKerberosPrincipal = ClusterHelper.getPropertyValue(targetCluster, SecurityUtil.NN_PRINCIPAL);
+        if (StringUtils.isBlank(targetKerberosPrincipal)) {
+            targetKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL;
+        }
+        additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+                targetKerberosPrincipal);
+
+        String targetRetentionPolicy = extensionProperties.getProperty(
+                HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName());
+        if (StringUtils.isBlank(targetRetentionPolicy)) {
+            targetRetentionPolicy = DEFAULT_RETENTION_POLICY;
+        }
+        validateRetentionPolicy(targetRetentionPolicy);
+        additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+                targetRetentionPolicy);
+
+        // Add jobName and jobCluster properties.
+        String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
+        if (StringUtils.isBlank(jobName)) {
+            throw new FalconException("Property "
+                    + ExtensionProperties.JOB_NAME.getName() + " cannot be null");
+        }
+        additionalProperties.put(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), jobName);
+
+        String jobClusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
+        Cluster jobCluster = ClusterHelper.getCluster(jobClusterName);
+        if (jobCluster == null) {
+            LOG.error("Cluster entity {} not found", ExtensionProperties.CLUSTER_NAME.getName());
+            throw new FalconException("Cluster entity "
+                    + ExtensionProperties.CLUSTER_NAME.getName() + " not found");
+        }
+        return additionalProperties;
+    }
+
+    public static void validateRetentionPolicy(String retentionPolicy) throws FalconException {
+        if (!retentionPolicy.equalsIgnoreCase("delete")) {
+            throw new FalconException("Retention policy \"" + retentionPolicy + "\" is invalid");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
index 92e9805..9e23894 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
@@ -44,9 +44,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.TimeZone;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index ffd9336..b14d500 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.extensions;
 
 import junit.framework.Assert;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.parser.EntityParserFactory;
@@ -34,12 +35,22 @@ import org.apache.falcon.entity.v0.process.PolicyType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
 import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
 import org.apache.falcon.extensions.store.AbstractTestExtensionStore;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.List;
 import java.util.Properties;
 
@@ -58,9 +69,18 @@ public class ExtensionTest extends AbstractTestExtensionStore {
     private static final String SOURCE_CLUSTER = "primaryCluster";
     private static final String TARGETDIR = "/users/target/file1";
     private static final String TARGET_CLUSTER = "backupCluster";
+    private static final String NN_URI = "hdfs://localhost:54314";
+    private static final String RETENTION_POLICY = "delete";
+    private static final String RETENTION_AGE = "mins(5)";
+    private static final String RETENTION_NUM = "7";
+    private static final String TARGET_KERBEROS_PRINCIPAL = "nn/backup@REALM";
+
     private Extension extension;
+    private MiniDFSCluster miniDFSCluster;
+    private DistributedFileSystem miniDfs;
+    private File baseDir;
 
-    private static Properties getHdfsProperties() {
+    private static Properties getCommonProperties() {
         Properties properties = new Properties();
         properties.setProperty(ExtensionProperties.JOB_NAME.getName(),
                 JOB_NAME);
@@ -72,6 +92,11 @@ public class ExtensionTest extends AbstractTestExtensionStore {
                 VALIDITY_END);
         properties.setProperty(ExtensionProperties.FREQUENCY.getName(),
                 FREQUENCY);
+        return properties;
+    }
+
+    private static Properties getHdfsProperties() {
+        Properties properties = getCommonProperties();
         properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
                 SOURCEDIR);
         properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(),
@@ -84,10 +109,52 @@ public class ExtensionTest extends AbstractTestExtensionStore {
         return properties;
     }
 
+    private static Properties getHdfsSnapshotExtensionProperties() {
+        Properties properties = getCommonProperties();
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+                SOURCEDIR);
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName(),
+                SOURCE_CLUSTER);
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+                RETENTION_POLICY);
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+                RETENTION_AGE);
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(),
+                RETENTION_NUM);
+        properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(),
+                NN_URI);
+
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+                TARGETDIR);
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName(),
+                TARGET_CLUSTER);
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+                RETENTION_POLICY);
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+                RETENTION_AGE);
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(),
+                RETENTION_NUM);
+        properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
+                NN_URI);
+        properties.setProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+                "5");
+        properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
+                "100");
+        properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
+                "false");
+
+        return properties;
+    }
+
     @BeforeClass
     public void init() throws Exception {
         extension = new Extension();
+        baseDir = Files.createTempDirectory("test_extensions_hdfs").toFile().getAbsoluteFile();
+        miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.EXTENSION_TEST_PORT, baseDir);
         initClusters();
+        miniDfs = miniDFSCluster.getFileSystem();
+        miniDfs.mkdirs(new Path(SOURCEDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        miniDfs.mkdirs(new Path(TARGETDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     }
 
     private void initClusters() throws Exception {
@@ -157,4 +224,100 @@ public class ExtensionTest extends AbstractTestExtensionStore {
 
         extension.getEntities(new HdfsMirroringExtension().getName(), props);
     }
+
+    @Test
+    public void testGetExtensionEntitiesForHdfsSnapshotMirroring() throws Exception {
+        ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+
+        miniDfs.allowSnapshot(new Path(SOURCEDIR));
+        miniDfs.allowSnapshot(new Path(TARGETDIR));
+
+        List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+                getHdfsSnapshotExtensionProperties());
+        if (entities == null || entities.isEmpty()) {
+            Assert.fail("Entities returned cannot be null or empty");
+        }
+
+        Assert.assertEquals(1, entities.size());
+        Entity entity = entities.get(0);
+        Assert.assertEquals(EntityType.PROCESS, entity.getEntityType());
+        parser.parse(new ByteArrayInputStream(entity.toString().getBytes()));
+
+        // Validate
+        Process processEntity = (Process) entity;
+        Assert.assertEquals(JOB_NAME, processEntity.getName());
+        org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters().
+                getClusters().get(0);
+        Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName());
+        Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart()));
+        Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd()));
+
+        Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString());
+        Assert.assertEquals("UTC", processEntity.getTimezone().getID());
+
+        Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine());
+        Assert.assertEquals(extensionStorePath + "/hdfs-snapshot-mirroring/libs",
+                processEntity.getWorkflow().getLib());
+        Assert.assertEquals(extensionStorePath
+                        + "/hdfs-snapshot-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml",
+                processEntity.getWorkflow().getPath());
+
+        Properties props = EntityUtil.getEntityProperties(processEntity);
+
+        Assert.assertEquals(SOURCEDIR, props.getProperty("sourceSnapshotDir"));
+        Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
+        Assert.assertEquals(TARGETDIR, props.getProperty("targetSnapshotDir"));
+        Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
+        Assert.assertEquals(JOB_NAME, props.getProperty("snapshotJobName"));
+        Assert.assertEquals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL,
+                props.getProperty("sourceNNKerberosPrincipal"));
+        Assert.assertEquals(TARGET_KERBEROS_PRINCIPAL, props.getProperty("targetNNKerberosPrincipal"));
+
+        //retry
+        Assert.assertEquals(3, processEntity.getRetry().getAttempts());
+        Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy());
+        Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString());
+    }
+
+
+    @Test(dependsOnMethods = "testGetExtensionEntitiesForHdfsSnapshotMirroring",
+            expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not allow snapshots.")
+    public void testHdfsSnapshotMirroringNonSnapshotableDir() throws Exception {
+        miniDfs.disallowSnapshot(new Path(SOURCEDIR));
+
+        List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+                getHdfsSnapshotExtensionProperties());
+        if (entities == null || entities.isEmpty()) {
+            Assert.fail("Entities returned cannot be null or empty");
+        }
+    }
+
+    @Test(expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "Missing extension property: sourceCluster")
+    public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() throws FalconException {
+        Properties props = getHdfsSnapshotExtensionProperties();
+        props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
+        extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), props);
+    }
+
+    @Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir",
+            expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not exist.")
+    public void testHdfsSnapshotMirroringNonExistingDir() throws Exception {
+        if (miniDfs.exists(new Path(SOURCEDIR))) {
+            miniDfs.delete(new Path(SOURCEDIR), true);
+        }
+
+        List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+                getHdfsSnapshotExtensionProperties());
+        if (entities == null || entities.isEmpty()) {
+            Assert.fail("Entities returned cannot be null or empty");
+        }
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 3462321..9dbacde 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -39,7 +39,11 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
                 "hdfs-mirroring-template.xml", extensionStorePath
                         + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-template.xml",
                 "hdfs-mirroring-workflow.xml", extensionStorePath
-                        + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml"
+                        + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
+                "hdfs-snapshot-mirroring-template.xml", extensionStorePath
+                        + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-template.xml",
+                "hdfs-snapshot-mirroring-workflow.xml", extensionStorePath
+                        + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml"
         );
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/backup-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/backup-cluster-0.1.xml b/extensions/src/test/resources/backup-cluster-0.1.xml
index c3ba6b9..27661be 100644
--- a/extensions/src/test/resources/backup-cluster-0.1.xml
+++ b/extensions/src/test/resources/backup-cluster-0.1.xml
@@ -22,7 +22,7 @@
     <interfaces>
         <interface type="readonly" endpoint="hftp://localhost:50010"
                    version="0.20.2"/>
-        <interface type="write" endpoint="hdfs://localhost:8020"
+        <interface type="write" endpoint="hdfs://localhost:54134"
                    version="0.20.2"/>
         <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
         <interface type="workflow" endpoint="http://localhost:11000/oozie/"
@@ -40,5 +40,6 @@
         <property name="field1" value="value1"/>
         <property name="field2" value="value2"/>
         <property name="hive.metastore.client.socket.timeout" value="20"/>
+        <property name="dfs.namenode.kerberos.principal" value="nn/backup@REALM"/>
     </properties>
 </cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
new file mode 100644
index 0000000..29131da
--- /dev/null
+++ b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##jobName##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##jobClusterName##">
+            <validity end="##jobValidityEnd##" start="##jobValidityStart##"/>
+        </cluster>
+    </clusters>
+
+    <tags/>
+
+    <parallel>1</parallel>
+    <!-- Replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##jobFrequency##</frequency>
+    <timezone>##jobTimezone##</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+    </properties>
+
+    <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##"
+              path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/>
+    <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/>
+    <notification type="##jobNotificationType##" to="##jobNotificationReceivers##"/>
+    <ACL/>
+</process>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/primary-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/primary-cluster-0.1.xml b/extensions/src/test/resources/primary-cluster-0.1.xml
index a9694c2..f42924c 100644
--- a/extensions/src/test/resources/primary-cluster-0.1.xml
+++ b/extensions/src/test/resources/primary-cluster-0.1.xml
@@ -22,7 +22,7 @@
     <interfaces>
         <interface type="readonly" endpoint="hftp://localhost:50010"
                    version="0.20.2"/>
-        <interface type="write" endpoint="hdfs://localhost:8020"
+        <interface type="write" endpoint="hdfs://localhost:54134"
                    version="0.20.2"/>
         <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
         <interface type="workflow" endpoint="http://localhost:11000/oozie/"

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f777dc9..8f4561c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <properties>
-                <hadoop.version>2.6.2</hadoop.version>
+                <hadoop.version>2.7.1</hadoop.version>
             </properties>
             <dependencyManagement>
                 <dependencies>
@@ -403,6 +403,41 @@
                 <module>addons/hivedr</module>
             </modules>
         </profile>
+
+        <profile>
+            <id>hdfs-snapshot-mirroring</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <version>1.3.1</version>
+                        <executions>
+                            <execution>
+                                <id>enforce-property</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireProperty>
+                                            <property>hadoop.version</property>
+                                            <regex>^(2.7.*)</regex>
+                                            <regexMessage>HDFS Snapshot replication only works with hadoop version >= 2.7.0</regexMessage>
+                                        </requireProperty>
+                                    </rules>
+                                    <fail>true</fail>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+            <modules>
+                <module>addons/hdfs-snapshot-mirroring</module>
+            </modules>
+        </profile>
+
         <profile>
             <id>adf</id>
             <modules>
@@ -627,6 +662,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.falcon</groupId>
+                <artifactId>falcon-extensions</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>javax.xml.bind</groupId>
                 <artifactId>jaxb-api</artifactId>
                 <version>2.1</version>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
index 6216b70..46031e3 100644
--- a/scheduler/src/test/resources/startup.properties
+++ b/scheduler/src/test/resources/startup.properties
@@ -84,7 +84,7 @@ debug.libext.process.paths=${falcon.libext}
 
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
 
 ######### Authentication Properties #########
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 3601e22..2f8f514 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -102,7 +102,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/assembly-standalone.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/assembly-standalone.xml b/src/main/assemblies/assembly-standalone.xml
index cc1486a..d3111b7 100644
--- a/src/main/assemblies/assembly-standalone.xml
+++ b/src/main/assemblies/assembly-standalone.xml
@@ -172,6 +172,40 @@
         </fileSet>
 
         <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>addons/extensions/hive-mirroring/src/main/META</directory>
             <outputDirectory>extensions/hive-mirroring/META</outputDirectory>
             <fileMode>0755</fileMode>
@@ -247,6 +281,12 @@
         </file>
 
         <file>
+            <source>addons/extensions/hdfs-snapshot-mirroring/README</source>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
+
+        <file>
             <source>addons/extensions/hive-mirroring/README</source>
             <outputDirectory>extensions/hive-mirroring</outputDirectory>
             <fileMode>0755</fileMode>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml
index 502018d..eb45c6f 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -150,6 +150,40 @@
         </fileSet>
 
         <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>addons/extensions/hive-mirroring/src/main/META</directory>
             <outputDirectory>extensions/hive-mirroring/META</outputDirectory>
             <fileMode>0755</fileMode>
@@ -250,6 +284,12 @@
         </file>
 
         <file>
+            <source>../addons/extensions/hdfs-snapshot-mirroring/README</source>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
+
+        <file>
             <source>../addons/extensions/hive-mirroring/README</source>
             <outputDirectory>extensions/hive-mirroring</outputDirectory>
             <fileMode>0755</fileMode>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index eac2e11..0b5c69a 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -155,6 +155,40 @@
         </fileSet>
 
         <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>addons/extensions/hive-mirroring/src/main/META</directory>
             <outputDirectory>extensions/hive-mirroring/META</outputDirectory>
             <fileMode>0755</fileMode>
@@ -235,6 +269,12 @@
         </file>
 
         <file>
+            <source>../addons/extensions/hdfs-snapshot-mirroring/README</source>
+            <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
+
+        <file>
             <source>../addons/extensions/hive-mirroring/README</source>
             <outputDirectory>extensions/hive-mirroring</outputDirectory>
             <fileMode>0755</fileMode>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/pom.xml
----------------------------------------------------------------------
diff --git a/test-util/pom.xml b/test-util/pom.xml
index 5b4a8c8..9f60119 100644
--- a/test-util/pom.xml
+++ b/test-util/pom.xml
@@ -45,8 +45,14 @@
 
                 <dependency>
                     <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-hdfs</artifactId>
                     <classifier>tests</classifier>
+                    <scope>compile</scope>
                 </dependency>
 
                 <dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
new file mode 100644
index 0000000..e1aee2e
--- /dev/null
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cluster.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import java.io.File;
+
+/**
+ * Create a local MiniDFS cluster for testing snapshots et al.
+ */
+public final class MiniHdfsClusterUtil {
+
+    private MiniHdfsClusterUtil() {}
+
+    public static final int EXTENSION_TEST_PORT = 54134;
+    public static final int SNAPSHOT_EVICTION_TEST_PORT = 54135;
+    public static final int SNAPSHOT_REPL_TEST_PORT = 54136;
+
+
+    public static MiniDFSCluster initMiniDfs(int port, File baseDir) throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+        builder.nameNodePort(port);
+        return builder.build();
+    }
+
+    public static void cleanupDfs(MiniDFSCluster miniDFSCluster, File baseDir) throws Exception {
+        miniDFSCluster.shutdown();
+        FileUtil.fullyDelete(baseDir);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index f1ef463..b233acf 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -29,19 +29,31 @@
 
     <artifactId>falcon-unit</artifactId>
 
-    <dependencies>
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <hadoop.version>2.7.1</hadoop.version>
+            </properties>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 
+    <dependencies>
         <dependency>
             <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-common</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.oozie</groupId>
             <artifactId>oozie-core</artifactId>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
index 3070689..595f75c 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
@@ -17,10 +17,6 @@
  */
 package org.apache.falcon.unit;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@@ -45,6 +41,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@@ -71,6 +69,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RpcClientFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
 /**
  * A Dummy implementation of RpcClientFactory that does not do RPC.
  * This is required as OozieClient tries to connect to RM via RPC to kill jobs which fails in local mode.
@@ -179,6 +181,12 @@ public final class LocalFalconRPCClientFactory implements RpcClientFactory {
         }
 
         @Override
+        public GetLabelsToNodesResponse getLabelsToNodes(GetLabelsToNodesRequest getLabelsToNodesRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
         public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest
                 getClusterNodeLabelsRequest) throws YarnException, IOException {
             return null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index 4dfea31..0e404cc 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -79,7 +79,7 @@ debug.libext.process.paths=${falcon.libext}
 
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
 
 ######### Authentication Properties #########
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index dad0581..3582be1 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -88,6 +88,16 @@
             </dependencies>
         </profile>
         <profile>
+            <id>hdfs-snapshot-mirroring</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.falcon</groupId>
+                    <artifactId>falcon-hdfs-snapshot-mirroring</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
             <id>adf</id>
             <dependencies>
                 <dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties
index 3544f0a..58018f1 100644
--- a/webapp/src/test/resources/startup.properties
+++ b/webapp/src/test/resources/startup.properties
@@ -87,7 +87,7 @@ debug.libext.process.paths=${falcon.libext}
 
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
 
 ######### Authentication Properties #########
 


Mime
View raw message