hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject hive git commit: HIVE-16591 : DR for function Binaries on HDFS (Anishek Agarwal, reviewed by Sankar Hariappan, Thejas Nair)
Date Wed, 31 May 2017 16:20:29 GMT
Repository: hive
Updated Branches:
  refs/heads/master 727a3dfcd -> f5b225021


HIVE-16591 : DR for function Binaries on HDFS (Anishek Agarwal, reviewed by Sankar Hariappan, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5b22502
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5b22502
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5b22502

Branch: refs/heads/master
Commit: f5b2250219a254c54886ebe9aa70b7dd5bc00b5f
Parents: 727a3df
Author: Anishek Agarwal <anishek@gmail.com>
Authored: Wed May 31 09:20:14 2017 -0700
Committer: Thejas M Nair <thejas@hortonworks.com>
Committed: Wed May 31 09:20:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 itests/hive-unit/pom.xml                        |   6 +
 .../hive/metastore/TestReplChangeManager.java   |  89 ++++++----
 ...TestReplicationScenariosAcrossInstances.java | 166 +++++++++++++----
 .../hadoop/hive/ql/parse/WarehouseInstance.java | 176 +++++++++++++------
 .../hadoop/hive/metastore/HiveMetaStore.java    |  14 +-
 .../hive/metastore/ReplChangeManager.java       |  51 ++++--
 .../apache/hadoop/hive/metastore/Warehouse.java |   4 +
 pom.xml                                         |   4 +-
 ql/pom.xml                                      |  13 ++
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |  22 ++-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  50 +++---
 .../hadoop/hive/ql/parse/repl/PathBuilder.java  |  66 +++++++
 .../repl/dump/events/CreateFunctionHandler.java |   9 +-
 .../parse/repl/dump/io/FunctionSerializer.java  |  40 ++++-
 .../hive/ql/parse/repl/load/MetaData.java       |   2 +-
 .../load/message/CreateFunctionHandler.java     | 156 ++++++++++++++--
 .../PrimaryToReplicaResourceFunctionTest.java   |  87 +++++++++
 18 files changed, 756 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2dfc8b6..5344f36 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -449,6 +449,8 @@ public class HiveConf extends Configuration {
     REPLCMINTERVAL("hive.repl.cm.interval","3600s",
         new TimeValidator(TimeUnit.SECONDS),
         "Inteval for cmroot cleanup thread."),
+    REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
+        "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 8adf309..ba9d7b9 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -393,6 +393,12 @@
       <version>${curator.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 3f9eec3..1495c1a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -47,6 +47,9 @@ import org.junit.Test;
 
 import com.google.common.collect.ImmutableMap;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class TestReplChangeManager {
   private static HiveMetaStoreClient client;
   private static HiveConf hiveConf;
@@ -163,28 +166,28 @@ public class TestReplChangeManager {
     createFile(part3Path, "p3");
     String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs);
 
-    Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
-    Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
-    Assert.assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path));
+    assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
+    assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
+    assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path));
 
     ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf);
     // verify cm.recycle(db, table, part) api moves file to cmroot dir
     int ret = cm.recycle(part1Path, false);
     Assert.assertEquals(ret, 1);
-    Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Chksum);
-    Assert.assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path));
+    Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, path1Chksum);
+    assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path));
 
     // Verify dropPartition recycle part files
     client.dropPartition(dbName, tblName, Arrays.asList("20160102"));
-    Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path));
-    Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Chksum);
-    Assert.assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path));
+    assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path));
+    Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, path2Chksum);
+    assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path));
 
     // Verify dropTable recycle partition files
     client.dropTable(dbName, tblName);
-    Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path));
-    Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Chksum);
-    Assert.assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path));
+    assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path));
+    Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, path3Chksum);
+    assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path));
 
     client.dropDatabase(dbName, true, true);
   }
@@ -233,28 +236,28 @@ public class TestReplChangeManager {
     createFile(filePath3, "f3");
     String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs);
 
-    Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
-    Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
-    Assert.assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3));
+    assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
+    assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
+    assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3));
 
     ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf);
     // verify cm.recycle(Path) api moves file to cmroot dir
     cm.recycle(filePath1, false);
-    Assert.assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1));
+    assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1));
 
-    Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileChksum1);
-    Assert.assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1));
+    Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, fileChksum1);
+    assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1));
 
     // Verify dropTable recycle table files
     client.dropTable(dbName, tblName);
 
-    Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileChksum2);
-    Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2));
-    Assert.assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2));
+    Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, fileChksum2);
+    assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2));
+    assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2));
 
-    Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileChksum3);
-    Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3));
-    Assert.assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3));
+    Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, fileChksum3);
+    assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3));
+    assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3));
 
     client.dropDatabase(dbName, true, true);
   }
@@ -294,17 +297,17 @@ public class TestReplChangeManager {
     ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false);
     ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, true);
 
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11)));
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12)));
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21)));
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22)));
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31)));
-    Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum11)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum12)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum21)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum22)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31)));
+    assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum32)));
 
-    fs.setTimes(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11), now - 86400*1000*2, now - 86400*1000*2);
-    fs.setTimes(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21), now - 86400*1000*2, now - 86400*1000*2);
-    fs.setTimes(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31), now - 86400*1000*2, now - 86400*1000*2);
-    fs.setTimes(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32), now - 86400*1000*2, now - 86400*1000*2);
+    fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum11), now - 86400*1000*2, now - 86400*1000*2);
+    fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum21), now - 86400*1000*2, now - 86400*1000*2);
+    fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum31), now - 86400*1000*2, now - 86400*1000*2);
+    fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum32), now - 86400*1000*2, now - 86400*1000*2);
 
     ReplChangeManager.scheduleCMClearer(hiveConf);
 
@@ -317,14 +320,22 @@ public class TestReplChangeManager {
       if (end - start > 5000) {
         Assert.fail("timeout, cmroot has not been cleared");
       }
-      if (!fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11)) &&
-          fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12)) &&
-          !fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21)) &&
-          fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22)) &&
-          !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31)) &&
-          !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31))) {
+      if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum11)) &&
+          fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum12)) &&
+          !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum21)) &&
+          fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum22)) &&
+          !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31)) &&
+          !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31))) {
         cleared = true;
       }
     } while (!cleared);
   }
+
+  @Test
+  public void shouldIdentifyCMURIs() {
+    assertTrue(ReplChangeManager
+        .isCMFileUri(new Path("hdfs://localhost:90000/somepath/adir/", "ab.jar#e239s2233"), fs));
+    assertFalse(ReplChangeManager
+        .isCMFileUri(new Path("/somepath/adir/", "ab.jar"), fs));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 3c1ef08..6713dff 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1,23 +1,30 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.junit.After;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.util.DependencyResolver;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -27,44 +34,56 @@ import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 
 public class TestReplicationScenariosAcrossInstances {
   @Rule
   public final TestName testName = new TestName();
 
   @Rule
-  public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<String>());
+  public TestRule replV1BackwardCompat;
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
 
   private static WarehouseInstance primary, replica;
 
-  @BeforeClass
-  public static void classLevelSetup() throws Exception {
-    primary = new WarehouseInstance();
-    replica = new WarehouseInstance();
-  }
+    @BeforeClass
+    public static void classLevelSetup() throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("dfs.client.use.datanode.hostname", "true");
+      MiniDFSCluster miniDFSCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+      primary = new WarehouseInstance(LOG, miniDFSCluster);
+      replica = new WarehouseInstance(LOG, miniDFSCluster);
+    }
+
+    @AfterClass
+    public static void classLevelTearDown() throws IOException {
+      primary.close();
+      replica.close();
+    }
 
   private String primaryDbName, replicatedDbName;
 
   @Before
   public void setup() throws Throwable {
+    replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName);
   }
 
-  @After
-  public void tearDown() throws Throwable {
-    primary.run(dropCommand(primaryDbName));
-    replica.run(dropCommand(replicatedDbName));
-  }
-
-  private String dropCommand(String dbName) {
-    return "drop database if exists " + dbName + " cascade ";
-  }
-
   @Test
   public void testCreateFunctionIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
@@ -73,10 +92,11 @@ public class TestReplicationScenariosAcrossInstances {
         .verify(bootStrapDump.lastReplicationId);
 
     primary.run("CREATE FUNCTION " + primaryDbName
-        + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
-        + "using jar  'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+        + ".testFunction as 'hivemall.tools.string.StopwordUDF' "
+        + "using jar  'ivy://io.github.myui:hivemall:0.4.0-2'");
 
-    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    WarehouseInstance.Tuple incrementalDump =
+        primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
         .verify(incrementalDump.lastReplicationId)
@@ -96,7 +116,8 @@ public class TestReplicationScenariosAcrossInstances {
 
     primary.run("Drop FUNCTION " + primaryDbName + ".testFunction ");
 
-    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    WarehouseInstance.Tuple incrementalDump =
+        primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
         .verify(incrementalDump.lastReplicationId)
@@ -107,8 +128,8 @@ public class TestReplicationScenariosAcrossInstances {
   @Test
   public void testBootstrapFunctionReplication() throws Throwable {
     primary.run("CREATE FUNCTION " + primaryDbName
-        + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
-        + "using jar  'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+        + ".testFunction as 'hivemall.tools.string.StopwordUDF' "
+        + "using jar  'ivy://io.github.myui:hivemall:0.4.0-2'");
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
 
     replica.load(replicatedDbName, bootStrapDump.dumpLocation)
@@ -116,4 +137,71 @@ public class TestReplicationScenariosAcrossInstances {
         .verify(replicatedDbName + ".testFunction");
   }
 
+  @Test
+  public void testCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable {
+    Dependencies dependencies = dependencies("ivy://io.github.myui:hivemall:0.4.0-2", primary);
+    String jarSubString = dependencies.toJarSubSql();
+
+    primary.run("CREATE FUNCTION " + primaryDbName
+        + ".anotherFunction as 'hivemall.tools.string.StopwordUDF' "
+        + "using " + jarSubString);
+
+    WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+        .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
+        .verify(replicatedDbName + ".anotherFunction");
+
+    FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus(
+        new Path(
+            replica.functionsRoot + "/" + replicatedDbName.toLowerCase() + "/anotherfunction/*/*")
+        , path -> path.toString().endsWith("jar"));
+    List<String> expectedDependenciesNames = dependencies.jarNames();
+    assertThat(fileStatuses.length, is(equalTo(expectedDependenciesNames.size())));
+    List<String> jars = Arrays.stream(fileStatuses).map(f -> {
+      String[] splits = f.getPath().toString().split("/");
+      return splits[splits.length - 1];
+    }).collect(Collectors.toList());
+
+    assertThat(jars, containsInAnyOrder(expectedDependenciesNames.toArray()));
+  }
+
+  static class Dependencies {
+    private final List<Path> fullQualifiedJarPaths;
+
+    Dependencies(List<Path> fullQualifiedJarPaths) {
+      this.fullQualifiedJarPaths = fullQualifiedJarPaths;
+    }
+
+    private String toJarSubSql() {
+      return StringUtils.join(
+          fullQualifiedJarPaths.stream().map(p -> "jar '" + p + "'").collect(Collectors.toList()),
+          ","
+      );
+    }
+
+    private List<String> jarNames() {
+      return fullQualifiedJarPaths.stream().map(p -> {
+        String[] splits = p.toString().split("/");
+        return splits[splits.length - 1];
+      }).collect(Collectors.toList());
+    }
+  }
+
+  private Dependencies dependencies(String ivyPath, WarehouseInstance onWarehouse)
+      throws IOException, URISyntaxException, SemanticException {
+    List<URI> localUris = new DependencyResolver().downloadDependencies(new URI(ivyPath));
+    List<Path> remotePaths = onWarehouse.copyToHDFS(localUris);
+    List<Path> collect =
+        remotePaths.stream().map(r -> {
+          try {
+            return PathBuilder
+                .fullyQualifiedHDFSUri(r, onWarehouse.miniDFSCluster.getFileSystem());
+
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }).collect(Collectors.toList());
+    return new Dependencies(collect);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 7271eae..a35f7b2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -1,102 +1,131 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
  */
 
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
-import org.junit.rules.TestRule;
+import org.codehaus.plexus.util.ExceptionUtils;
+import org.slf4j.Logger;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
-class WarehouseInstance {
+class WarehouseInstance implements Closeable {
+  final String functionsRoot;
+  private Logger logger;
   private Driver driver;
+  private HiveConf hiveConf;
+  MiniDFSCluster miniDFSCluster;
   private HiveMetaStoreClient client;
-  private HiveConf hconf;
 
-  private static int schemaNameCounter = 0;
+  private static int uniqueIdentifier = 0;
+
   private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
 
-  /**
-   * This will be used to allow the primary and replica warehouse to be the same instance of
-   * hive server
-   */
-  WarehouseInstance(WarehouseInstance other){
-    this.driver = other.driver;
-    this.client = other.client;
-    this.hconf = other.hconf;
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
+    this.logger = logger;
+    this.miniDFSCluster = cluster;
+    assert miniDFSCluster.isClusterUp();
+    assert miniDFSCluster.isDataNodeUp();
+    DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+
+    Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
+    this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
+    initialize(cmRootPath.toString());
   }
 
-  WarehouseInstance() throws Exception {
-    hconf = new HiveConf(TestReplicationScenarios.class);
+  private void initialize(String cmRoot) throws Exception {
+    hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
     String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
     String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
         + Path.SEPARATOR
         + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
         + "_"
         + System.nanoTime();
-
     if (metaStoreUri != null) {
-      hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
-      //        useExternalMS = true;
+      hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
       return;
     }
 
     // turn on db notification listener on meta store
-    hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS);
-    hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
-    hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
-    hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/");
-    String schemaName = "APP" + schemaNameCounter++;
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS);
+    hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+    hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot);
+    hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot);
+    String schemaName = "APP" + uniqueIdentifier;
     System.setProperty("datanucleus.mapping.Schema", schemaName);
-    hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
+    hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
         "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true");
-    int metaStorePort = MetaStoreUtils.startMetaStore(hconf);
-    hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/");
-    hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort);
-    hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
-    hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-    hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+    int metaStorePort = MetaStoreUtils.startMetaStore(hiveConf);
+    hiveConf.setVar(HiveConf.ConfVars.REPLDIR,
+        hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort);
+    hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
     Path testPath = new Path(hiveWarehouseLocation);
-    FileSystem fs = FileSystem.get(testPath.toUri(), hconf);
-    fs.mkdirs(testPath);
+    FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf);
+    testPathFileSystem.mkdirs(testPath);
+
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    client = new HiveMetaStoreClient(hiveConf);
+    // change the value for the next instance.
+    ++uniqueIdentifier;
+  }
 
-    driver = new Driver(hconf);
-    SessionState.start(new CliSessionState(hconf));
-    client = new HiveMetaStoreClient(hconf);
+  private Path mkDir(DistributedFileSystem fs, String pathString)
+      throws IOException, SemanticException {
+    Path path = new Path(pathString);
+    fs.mkdir(path, new FsPermission("777"));
+    return PathBuilder.fullyQualifiedHDFSUri(path, fs);
   }
 
   private int next = 0;
@@ -161,8 +190,8 @@ class WarehouseInstance {
    */
   private void verifyResults(String[] data) throws IOException {
     List<String> results = getOutput();
-    TestReplicationScenariosAcrossInstances.LOG.info("Expecting {}", data);
-    TestReplicationScenariosAcrossInstances.LOG.info("Got {}", results);
+    logger.info("Expecting {}", StringUtils.join(data, ","));
+    logger.info("Got {}", results);
     assertEquals(data.length, results.size());
     for (int i = 0; i < data.length; i++) {
       assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
@@ -174,7 +203,7 @@ class WarehouseInstance {
     try {
       driver.getResults(results);
     } catch (CommandNeedRetryException e) {
-      TestReplicationScenariosAcrossInstances.LOG.warn(e.getMessage(), e);
+      logger.warn(e.getMessage(), e);
       throw new RuntimeException(e);
     }
     return results;
@@ -182,12 +211,45 @@ class WarehouseInstance {
 
   private void printOutput() throws IOException {
     for (String s : getOutput()) {
-      TestReplicationScenariosAcrossInstances.LOG.info(s);
+      logger.info(s);
     }
   }
 
-  public ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip){
-    return new ReplicationV1CompatRule(client,hconf,testsToSkip);
+  ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) {
+    return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) {
+      miniDFSCluster.shutdown();
+    }
+  }
+
+  List<Path> copyToHDFS(List<URI> localUris) throws IOException, SemanticException {
+    DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+    Path destinationBasePath = new Path("/", String.valueOf(System.nanoTime()));
+    mkDir(fs, destinationBasePath.toString());
+    localUris.forEach(uri -> {
+      Path localPath = new Path(uri);
+      try {
+        FileSystem localFs = localPath.getFileSystem(hiveConf);
+        boolean success = FileUtils
+            .copy(localFs, localPath, fs, destinationBasePath, false, false, hiveConf);
+        if (!success) {
+          fail("FileUtils could not copy local uri " + localPath.toString() + " to hdfs");
+        }
+      } catch (IOException e) {
+        String message = "error on copy of local uri " + localPath.toString() + " to hdfs";
+        logger.error(message, e);
+        fail(message + ExceptionUtils.getFullStackTrace(e));
+      }
+    });
+
+    List<FileStatus> fileStatuses =
+        Arrays.asList(fs.globStatus(new Path(destinationBasePath, "*")));
+    return fileStatuses.stream().map(FileStatus::getPath).collect(Collectors.toList());
   }
 
   static class Tuple {

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 52bfb26..e886540 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6377,7 +6377,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (func == null) {
           throw new NoSuchObjectException("Function " + funcName + " does not exist");
         }
+        // if copy of jar to change management fails we fail the metastore transaction, since the
+        // user might delete the jars on HDFS externally after dropping the function, hence having
+        // a copy is required to allow incremental replication to work correctly.
+        if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) {
+          for (ResourceUri uri : func.getResourceUris()) {
+            if (uri.getUri().toLowerCase().startsWith("hdfs:")) {
+              wh.addToChangeManagement(new Path(uri.getUri()));
+            }
+          }
+        }
 
+        // if the operation on metastore fails, we don't do anything in change management, but fail
+        // the metastore transaction, as having a copy of the jar in change management is not going
+        // to cause any problem, the cleaner thread will remove this when this jar expires.
         ms.dropFunction(dbName, funcName);
         if (transactionalListeners.size() > 0) {
           transactionalListenerResponses =
@@ -6385,7 +6398,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                                                     EventType.DROP_FUNCTION,
                                                     new DropFunctionEvent(func, true, this));
         }
-
         success = ms.commitTransaction();
       } finally {
         if (!success) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 51e4627..c955470 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -100,6 +101,34 @@ public class ReplChangeManager {
     }
   };
 
+  void addFile(Path path) throws MetaException {
+    if (!enabled) {
+      return;
+    }
+    try {
+      if (fs.isDirectory(path)) {
+        throw new IllegalArgumentException(path + " cannot be a directory");
+      }
+      Path cmPath = getCMPath(hiveConf, getChksumString(path, fs));
+      boolean copySuccessful = FileUtils
+          .copy(path.getFileSystem(hiveConf), path, cmPath.getFileSystem(hiveConf), cmPath, false,
+              false, hiveConf);
+      if (!copySuccessful) {
+        LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore");
+      } else {
+        fs.setOwner(cmPath, msUser, msGroup);
+        try {
+          fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+        } catch (UnsupportedOperationException e) {
+          LOG.warn("Error setting xattr for " + path.toString());
+        }
+      }
+    } catch (Exception exception) {
+      throw new MetaException(StringUtils.stringifyException(exception));
+    }
+  }
+
+
   /***
    * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned),
    *   recursively move files inside directory to cmroot. Note the table must be managed table
@@ -122,7 +151,7 @@ public class ReplChangeManager {
           count += recycle(file.getPath(), ifPurge);
         }
       } else {
-        Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs));
+        Path cmPath = getCMPath(hiveConf, getChksumString(path, fs));
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Moving " + path.toString() + " to " + cmPath.toString());
@@ -198,16 +227,15 @@ public class ReplChangeManager {
    * Convert a path of file inside a partition or table (if non-partitioned)
    *   to a deterministic location of cmroot. So user can retrieve the file back
    *   with the original location plus checksum.
-   * @param path original path inside partition or table
    * @param conf
-   * @param chksum checksum of the file, can be retrieved by {@link getCksumString}
+   * @param checkSum checksum of the file, can be retrieved by {@link getCksumString}
    * @return
    * @throws IOException
    * @throws MetaException
    */
-  static public Path getCMPath(Path path, Configuration conf, String chksum)
+  static Path getCMPath(Configuration conf, String checkSum)
       throws IOException, MetaException {
-    String newFileName = chksum;
+    String newFileName = checkSum;
     int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
 
@@ -215,9 +243,7 @@ public class ReplChangeManager {
       newFileName = newFileName.substring(0, maxLength-1);
     }
 
-    Path cmPath = new Path(cmroot, newFileName);
-
-    return cmPath;
+    return new Path(cmroot, newFileName);
   }
 
   /***
@@ -238,14 +264,14 @@ public class ReplChangeManager {
       }
 
       if (!srcFs.exists(src)) {
-        return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+        return srcFs.getFileStatus(getCMPath(conf, chksumString));
       }
 
       String currentChksumString = getChksumString(src, srcFs);
       if (currentChksumString == null || chksumString.equals(currentChksumString)) {
         return srcFs.getFileStatus(src);
       } else {
-        return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+        return srcFs.getFileStatus(getCMPath(conf, chksumString));
       }
     } catch (IOException e) {
       throw new MetaException(StringUtils.stringifyException(e));
@@ -283,6 +309,11 @@ public class ReplChangeManager {
     return result;
   }
 
+  public static boolean isCMFileUri(Path fromPath, FileSystem srcFs) {
+    String[] result = getFileWithChksumFromURI(fromPath.toString());
+    return result[1] != null;
+  }
+
   /**
    * Thread to clear old files of cmroot recursively
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 8134ab2..053a0de 100755
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -206,6 +206,10 @@ public class Warehouse {
     return false;
   }
 
+  void addToChangeManagement(Path file) throws MetaException {
+    cm.addFile(file);
+  }
+
   public boolean deleteDir(Path f, boolean recursive) throws MetaException {
     return deleteDir(f, recursive, false);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e3ff84f..70518d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,7 +143,7 @@
     <h2database.version>1.3.166</h2database.version>
     <hadoop.version>2.8.0</hadoop.version>
     <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
-    <hamcrest.version>1.1</hamcrest.version>
+    <hamcrest.version>1.3</hamcrest.version>
     <hbase.version>1.1.1</hbase.version>
     <!-- required for logging test to avoid including hbase which pulls disruptor transitively -->
     <disruptor.version>3.3.0</disruptor.version>
@@ -178,7 +178,7 @@
     <log4j2.version>2.6.2</log4j2.version>
     <opencsv.version>2.3</opencsv.version>
     <orc.version>1.3.3</orc.version>
-    <mockito-all.version>1.9.5</mockito-all.version>
+    <mockito-all.version>1.10.19</mockito-all.version>
     <mina.version>2.0.0-M5</mina.version>
     <netty.version>4.0.29.Final</netty.version>
     <parquet.version>1.8.1</parquet.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 40a216b..9c17695 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -29,6 +29,7 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
+    <powermock.version>1.6.6</powermock.version>
   </properties>
 
   <dependencies>
@@ -735,6 +736,18 @@
       <version>${hamcrest.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index f277284..71db332 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -75,7 +75,27 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
       FileSystem srcFs = fromPath.getFileSystem(conf);
       dstFs = toPath.getFileSystem(conf);
 
-      List<FileStatus> srcFiles = new ArrayList<FileStatus>();
+      // This should only be true for copy tasks created from functions, otherwise there should never
+      // be a CM uri in the from path.
+      if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) {
+        String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString());
+        Path sourcePath = ReplChangeManager
+            .getFileStatus(new Path(result[0]), result[1], conf)
+            .getPath();
+        if (FileUtils.copy(
+            sourcePath.getFileSystem(conf), sourcePath,
+            dstFs, toPath
+            , false, false, conf
+        )) {
+          return 0;
+        } else {
+          console.printError("Failed to copy: '" + fromPath.toString() + "to: '" + toPath.toString()
+              + "'");
+          return 1;
+        }
+      }
+
+      List<FileStatus> srcFiles = new ArrayList<>();
       FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
       LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
       if (! rwork.getReadListFromInput()){

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 1e6b192..adcdc12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -47,21 +47,20 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
 import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
-import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +76,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
@@ -377,11 +377,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           continue;
         }
 
-        Path functionMetadataRoot =
-            new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME);
+        Path functionRoot = new Path(functionsRoot, functionName);
+        Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
         try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf),
             functionMetadataRoot)) {
-          new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec);
+          FunctionSerializer serializer =
+              new FunctionSerializer(tuple.object, conf);
+          serializer.writeTo(jsonWriter, tuple.replicationSpec);
         }
         REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
       }
@@ -839,31 +841,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString()));
     Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
 
-    FileSystem fs = FileSystem.get(fromURI, conf);
-    inputs.add(toReadEntity(fromPath, conf));
-
     try {
-      MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
-      ReplicationSpec replicationSpec = metaData.getReplicationSpec();
-      if (replicationSpec.isNoop()) {
-        // nothing to do here, silently return.
-        return;
-      }
-      CreateFunctionDesc desc = new CreateFunctionDesc(
-          dbName + "." + metaData.function.getFunctionName(),
-          false,
-          metaData.function.getClassName(),
-          metaData.function.getResourceUris()
+      CreateFunctionHandler handler = new CreateFunctionHandler();
+      List<Task<? extends Serializable>> tasksList = handler.handle(
+          new MessageHandler.Context(
+              dbName, null, fromPath.toString(), createDbTask, null, conf, db,
+              null, LOG)
       );
 
-      Task<FunctionWork> currentTask = TaskFactory.get(new FunctionWork(desc), conf);
-      if (createDbTask != null) {
-        createDbTask.addDependentTask(currentTask);
+      tasksList.forEach(task -> {
+        createDbTask.addDependentTask(task);
         LOG.debug("Added {}:{} as a precursor of {}:{}",
-            createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(),
-            currentTask.getId());
-      }
-    } catch (IOException e) {
+            createDbTask.getClass(), createDbTask.getId(), task.getClass(),
+            task.getId());
+
+      });
+      inputs.addAll(handler.readEntities());
+    } catch (Exception e) {
       throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java
new file mode 100644
index 0000000..05b9821
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
+/**
+ * Path builder to stitch together paths with different components might be useful as utils across
+ * replication semantic analyzer atleast.
+ */
+public class PathBuilder {
+  private ArrayList<String> descendants = new ArrayList<>();
+  private String basePath;
+
+  public PathBuilder(String basePath) {
+    this.basePath = basePath;
+  }
+
+  public PathBuilder addDescendant(String path) {
+    descendants.add(path);
+    return this;
+  }
+
+  public Path build() {
+    Path result = new Path(this.basePath);
+    for (String descendant : descendants) {
+      result = new Path(result, descendant);
+    }
+    return result;
+  }
+
+  public static Path fullyQualifiedHDFSUri(Path input, FileSystem hdfsFileSystem)
+      throws SemanticException {
+    URI uri = input.toUri();
+    String scheme = hdfsFileSystem.getScheme();
+    String authority = hdfsFileSystem.getUri().getAuthority();
+    String path = uri.getPath();
+    try {
+      return new Path(new URI(scheme, authority, path, null, null));
+    } catch (URISyntaxException e) {
+      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index d04c7b5..ee3432c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -23,11 +23,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
 class CreateFunctionHandler extends AbstractEventHandler {
   CreateFunctionHandler(NotificationEvent event) {
     super(event);
@@ -42,7 +41,7 @@ class CreateFunctionHandler extends AbstractEventHandler {
     FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
 
     try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
-      new FunctionSerializer(createFunctionMessage.getFunctionObj())
+      new FunctionSerializer(createFunctionMessage.getFunctionObj(), withinContext.hiveConf)
           .writeTo(jsonWriter, withinContext.replicationSpec);
     }
     withinContext.createDmd(this).write();

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index 5dc7023..6c2a402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -17,21 +17,31 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 public class FunctionSerializer implements JsonWriter.Serializer {
-  public static final String FIELD_NAME="function";
+  public static final String FIELD_NAME = "function";
   private Function function;
+  private HiveConf hiveConf;
 
-  public FunctionSerializer(Function function) {
+  public FunctionSerializer(Function function, HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
     this.function = function;
   }
 
@@ -39,9 +49,33 @@ public class FunctionSerializer implements JsonWriter.Serializer {
   public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
       throws SemanticException, IOException {
     TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    List<ResourceUri> resourceUris = new ArrayList<>();
+    for (ResourceUri uri : function.getResourceUris()) {
+      Path inputPath = new Path(uri.getUri());
+      if ("hdfs".equals(inputPath.toUri().getScheme())) {
+        FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
+        Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem);
+        String checkSum = ReplChangeManager.getChksumString(qualifiedUri, fileSystem);
+        String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum);
+        resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri));
+      } else {
+        resourceUris.add(uri);
+      }
+    }
+    Function copyObj = new Function(this.function);
+    if (!resourceUris.isEmpty()) {
+      assert resourceUris.size() == this.function.getResourceUris().size();
+      copyObj.setResourceUris(resourceUris);
+    }
+
     try {
+      //This is required otherwise correct work object on repl load wont be created.
+      writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.REPL_SCOPE.toString(),
+          "all");
+      writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+          additionalPropertiesProvider.getCurrentReplicationState());
       writer.jsonGenerator
-          .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8));
+          .writeStringField(FIELD_NAME, serializer.toString(copyObj, UTF_8));
     } catch (TException e) {
       throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
index fc02dfd..17d8ab2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
@@ -37,7 +37,7 @@ public class MetaData {
     this(null, null, null, new ReplicationSpec(), null);
   }
 
-  MetaData(Database db, Table table, Iterable<Partition> partitions,
+  public MetaData(Database db, Table table, Iterable<Partition> partitions,
       ReplicationSpec replicationSpec, Function function) {
     this.db = db;
     this.table = table;

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index 8b6179b..452f506 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -18,51 +18,177 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toReadEntity;
+
 public class CreateFunctionHandler extends AbstractMessageHandler {
   @Override
   public List<Task<? extends Serializable>> handle(Context context)
       throws SemanticException {
     try {
-      FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf);
-      MetaData metadata;
+      FunctionDescBuilder builder = new FunctionDescBuilder(context);
+      CreateFunctionDesc descToLoad = builder.build();
+      context.log.debug("Loading function desc : {}", descToLoad.toString());
+      Task<FunctionWork> createTask = TaskFactory.get(
+          new FunctionWork(descToLoad), context.hiveConf
+      );
+      context.log.debug("Added create function task : {}:{},{}", createTask.getId(),
+          descToLoad.getFunctionName(), descToLoad.getClassName());
+      // This null check is specifically done as the same class is used to handle both incremental and
+      // bootstrap replication scenarios for create function. When doing bootstrap we do not have
+      // event id for this event but rather when bootstrap started and hence we pass in null dmd for
+      // bootstrap.There should be a better way to do this but might required a lot of changes across
+      // different handlers, unless this is a common pattern that is seen, leaving this here.
+      if (context.dmd != null) {
+        databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo());
+      }
+      readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf));
+      if (builder.replCopyTasks.isEmpty()) {
+        // reply copy only happens for jars on hdfs not otherwise.
+        return Collections.singletonList(createTask);
+      } else {
+        /**
+         *  This is to understand how task dependencies work.
+         *  All root tasks are executed in parallel. For bootstrap replication there should be only one root task of creating db. Incremental can be multiple ( have to verify ).
+         *  Task has children, which are put in queue for execution after the parent has finished execution.
+         *  One -to- One dependency can be satisfied by adding children to a given task, do this recursively where the relation holds.
+         *  for many to one , create a barrier task that is the child of every item in 'many' dependencies, make the 'one' dependency as child of barrier task.
+         *  add the 'many' to parent/root tasks. The execution environment will make sure that the child barrier task will not get executed unless all parents of the barrier task are complete,
+         *  which should only happen when the last task is finished, at which point the child of the barrier task is picked up.
+         */
+        Task<? extends Serializable> barrierTask =
+            TaskFactory.get(new DependencyCollectionWork(), context.hiveConf);
+        builder.replCopyTasks.forEach(t -> t.addDependentTask(barrierTask));
+        barrierTask.addDependentTask(createTask);
+        return builder.replCopyTasks;
+      }
+    } catch (Exception e) {
+      throw (e instanceof SemanticException)
+          ? (SemanticException) e
+          : new SemanticException("Error reading message members", e);
+    }
+  }
+
+  private static class FunctionDescBuilder {
+    private final Context context;
+    private final MetaData metadata;
+    private final String destinationDbName;
+    private final List<Task<?>> replCopyTasks = new ArrayList<>();
+
+    private FunctionDescBuilder(Context context) throws SemanticException {
+      this.context = context;
       try {
+        FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf);
         metadata = EximUtil.readMetaData(fs, new Path(context.location, EximUtil.METADATA_NAME));
       } catch (IOException e) {
         throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
       }
+      destinationDbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName;
+    }
 
-      String dbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName;
-      CreateFunctionDesc desc = new CreateFunctionDesc(
-          FunctionUtils.qualifyFunctionName(metadata.function.getFunctionName(), dbName), false,
-          metadata.function.getClassName(), metadata.function.getResourceUris()
+    private CreateFunctionDesc build() {
+      replCopyTasks.clear();
+      PrimaryToReplicaResourceFunction conversionFunction =
+          new PrimaryToReplicaResourceFunction(context, metadata, destinationDbName);
+      // We explicitly create immutable lists here as it forces the guava lib to run the transformations
+      // and not do them lazily. The reason being the function class used for transformations additionally
+      // also creates the corresponding replCopyTasks, which cannot be evaluated lazily. since the query
+      // plan needs to be complete before we execute and not modify it while execution in the driver.
+      List<ResourceUri> transformedUris = ImmutableList.copyOf(
+          Lists.transform(metadata.function.getResourceUris(), conversionFunction)
+      );
+      replCopyTasks.addAll(conversionFunction.replCopyTasks);
+      String fullQualifiedFunctionName = FunctionUtils.qualifyFunctionName(
+          metadata.function.getFunctionName(), destinationDbName
       );
+      return new CreateFunctionDesc(
+          fullQualifiedFunctionName, false, metadata.function.getClassName(), transformedUris
+      );
+    }
+  }
 
-      Task<FunctionWork> task = TaskFactory.get(new FunctionWork(desc), context.hiveConf);
-      context.log.debug("Added create function task : {}:{},{}", task.getId(),
-          metadata.function.getFunctionName(), metadata.function.getClassName());
-      databasesUpdated.put(dbName, context.dmd.getEventTo());
-      return Collections.singletonList(task);
-    } catch (Exception e) {
-      throw (e instanceof SemanticException)
-          ? (SemanticException) e
-          : new SemanticException("Error reading message members", e);
+  static class PrimaryToReplicaResourceFunction
+      implements Function<ResourceUri, ResourceUri> {
+    private final Context context;
+    private final MetaData metadata;
+    private final List<Task<?>> replCopyTasks = new ArrayList<>();
+    private final String functionsRootDir;
+    private String destinationDbName;
+
+    PrimaryToReplicaResourceFunction(Context context, MetaData metadata,
+        String destinationDbName) {
+      this.context = context;
+      this.metadata = metadata;
+      this.destinationDbName = destinationDbName;
+      this.functionsRootDir = context.hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR);
+    }
+
+    @Override
+    public ResourceUri apply(ResourceUri resourceUri) {
+      try {
+        return resourceUri.getUri().toLowerCase().startsWith("hdfs:")
+            ? destinationResourceUri(resourceUri)
+            : resourceUri;
+      } catch (IOException | SemanticException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * the destination also includes the current timestamp to randomise the placement of the jar at a given location for a function .
+     * this is done to allow the  CREATE / DROP / CREATE of the same function with same name and jar's but updated
+     * binaries across the two creates.
+     */
+    ResourceUri destinationResourceUri(ResourceUri resourceUri)
+        throws IOException, SemanticException {
+      String sourceUri = resourceUri.getUri();
+      String[] split = sourceUri.split(Path.SEPARATOR);
+      PathBuilder pathBuilder = new PathBuilder(functionsRootDir);
+      Path qualifiedDestinationPath = PathBuilder.fullyQualifiedHDFSUri(
+          pathBuilder
+              .addDescendant(destinationDbName.toLowerCase())
+              .addDescendant(metadata.function.getFunctionName().toLowerCase())
+              .addDescendant(String.valueOf(System.nanoTime()))
+              .addDescendant(ReplChangeManager.getFileWithChksumFromURI(split[split.length - 1])[0])
+              .build(),
+          FileSystem.get(context.hiveConf)
+      );
+
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+          metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath,
+          context.hiveConf
+      );
+      replCopyTasks.add(copyTask);
+      ResourceUri destinationUri =
+          new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString());
+      context.log.debug("copy source uri : {} to destination uri: {}", sourceUri, destinationUri);
+      return destinationUri;
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/f5b22502/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java
new file mode 100644
index 0000000..7a87701
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java
@@ -0,0 +1,87 @@
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.PrimaryToReplicaResourceFunction;
+import static org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler.Context;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Matchers.any;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ PrimaryToReplicaResourceFunction.class, FileSystem.class, ReplCopyTask.class,
+                    System.class })
+public class PrimaryToReplicaResourceFunctionTest {
+
+  private PrimaryToReplicaResourceFunction function;
+  @Mock
+  private HiveConf hiveConf;
+  @Mock
+
+  private Function functionObj;
+  @Mock
+  private FileSystem mockFs;
+  private static Logger logger =
+      LoggerFactory.getLogger(PrimaryToReplicaResourceFunctionTest.class);
+
+  @Before
+  public void setup() {
+    MetaData metadata = new MetaData(null, null, null, null, functionObj);
+    Context context =
+        new Context("primaryDb", null, null, null, null, hiveConf, null, null, logger);
+    when(hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR))
+        .thenReturn("/someBasePath/withADir/");
+    function = new PrimaryToReplicaResourceFunction(context, metadata, "replicaDbName");
+  }
+
+  @Test
+  public void createDestinationPath() throws IOException, SemanticException, URISyntaxException {
+    mockStatic(FileSystem.class);
+    when(FileSystem.get(any(Configuration.class))).thenReturn(mockFs);
+    when(mockFs.getScheme()).thenReturn("hdfs");
+    when(mockFs.getUri()).thenReturn(new URI("hdfs", "somehost:9000", null, null, null));
+    mockStatic(System.class);
+    when(System.currentTimeMillis()).thenReturn(Long.MAX_VALUE);
+    when(functionObj.getFunctionName()).thenReturn("someFunctionName");
+    mockStatic(ReplCopyTask.class);
+    Task mock = mock(Task.class);
+    when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class),
+        any(HiveConf.class))).thenReturn(mock);
+
+    ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR,
+        "hdfs://localhost:9000/user/someplace/ab.jar#e094828883"));
+
+    assertThat(resourceUri.getUri(),
+        is(equalTo(
+            "hdfs://somehost:9000/someBasePath/withADir/replicaDbName/somefunctionname/" + String
+                .valueOf(Long.MAX_VALUE) + "/ab.jar")));
+  }
+}
\ No newline at end of file


Mime
View raw message