falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-2072 Hive2 URLs in Falcon should allow additional configuration elements in the URL
Date Wed, 03 Aug 2016 22:06:29 GMT
Repository: falcon
Updated Branches:
  refs/heads/master da767c2f6 -> b0efc6fbf


FALCON-2072 Hive2 URLs in Falcon should allow additional configuration elements in the URL

Author: Sowmya Ramesh <sramesh@hortonworks.com>

Reviewers: "Venkat  Ranganathan <venkat@hortonworks.com>, Balu Vellanki <balu@apache.org>"

Closes #250 from sowmyaramesh/FALCON-2072


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b0efc6fb
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b0efc6fb
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b0efc6fb

Branch: refs/heads/master
Commit: b0efc6fbf97c109a81b490339d2521c21fffe37e
Parents: da767c2
Author: Sowmya Ramesh <sramesh@hortonworks.com>
Authored: Wed Aug 3 15:06:19 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Wed Aug 3 15:06:19 2016 -0700

----------------------------------------------------------------------
 .../main/META/hive-mirroring-properties.json    | 16 ++++-
 .../runtime/hive-mirroring-secure-workflow.xml  | 12 ++++
 .../runtime/hive-mirroring-workflow.xml         | 12 ++++
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  2 +
 .../org/apache/falcon/hive/util/EventUtils.java | 49 ++++++++++++--
 .../falcon/hive/util/HiveDRStatusStore.java     | 24 +++----
 .../apache/falcon/hive/util/EventUtilsTest.java | 71 ++++++++++++++++++++
 .../mirroring/hive/HiveMirroringExtension.java  | 18 ++---
 .../hive/HiveMirroringExtensionProperties.java  |  2 +
 9 files changed, 178 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json
----------------------------------------------------------------------
diff --git a/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json
b/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json
index e019e68..686ce94 100644
--- a/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json
+++ b/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json
@@ -94,10 +94,16 @@
         {
             "propertyName":"sourceHiveServer2Uri",
             "required":true,
-            "description":"Hive2 server end point",
+            "description":"Hive2 server end point. If Zookeeper discovery mode is enabled
zookeeper_ensemble is expected",
             "example":"hive2://localhost:10000"
         },
         {
+            "propertyName":"sourceHiveServer2ExtraOpts",
+            "required":false,
+            "description":"Extra opts required when SSL is enbaled, Http mode and when zookeeper
discovery is used",
+            "example":"serviceDiscoveryMode=zooKeeper; zooKeeperNamespace=<hiveserver2_namespace>"
+        },
+        {
             "propertyName":"sourceDatabases",
             "required":true,
             "description":"For DB level replication specify multiple comma separated databases
to replicate",
@@ -130,10 +136,16 @@
         {
             "propertyName":"targetHiveServer2Uri",
             "required":true,
-            "description":"Hive2 server end point",
+            "description":"Hive2 server end point. If Zookeeper discovery mode is enabled
zookeeper_ensemble is expected",
             "example":"hive2://localhost:10000"
         },
         {
+            "propertyName":"targetHiveServer2ExtraOpts",
+            "required":false,
+            "description":"Extra opts required when SSL is enbaled, Http mode and when zookeeper
discovery is used",
+            "example":"serviceDiscoveryMode=zooKeeper; zooKeeperNamespace=<hiveserver2_namespace>"
+        },
+        {
             "propertyName":"targetStagingPath",
             "required":false,
             "description":"Staging path on target",

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
index 63e9a67..6ccea3a 100644
--- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
+++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
@@ -102,6 +102,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -122,6 +124,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>
@@ -200,6 +204,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -220,6 +226,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>
@@ -302,6 +310,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -322,6 +332,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
index 4f6eec5..5336bda 100644
--- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
+++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
@@ -52,6 +52,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -66,6 +68,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>
@@ -128,6 +132,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -142,6 +148,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>
@@ -208,6 +216,8 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceHiveServer2ExtraOpts</arg>
+            <arg>${sourceHiveServer2ExtraOpts}</arg>
             <arg>-sourceDatabases</arg>
             <arg>${sourceDatabases}</arg>
             <arg>-sourceTables</arg>
@@ -222,6 +232,8 @@
             <arg>${targetMetastoreUri}</arg>
             <arg>-targetHiveServer2Uri</arg>
             <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetHiveServer2ExtraOpts</arg>
+            <arg>${targetHiveServer2ExtraOpts}</arg>
             <arg>-targetStagingPath</arg>
             <arg>${targetStagingPath}</arg>
             <arg>-targetNN</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index d891487..9decd30 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -30,6 +30,7 @@ public enum HiveDRArgs {
     SOURCE_CLUSTER("sourceCluster", "source cluster"),
     SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"),
     SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"),
+    SOURCE_HS2_URI_EXTRA_OPTS("sourceHiveServer2ExtraOpts", "source HS2 extra opts", false),
     SOURCE_DATABASES("sourceDatabases", "comma source databases"),
     SOURCE_DATABASE("sourceDatabase", "First source database"),
     SOURCE_TABLES("sourceTables", "comma source tables"),
@@ -47,6 +48,7 @@ public enum HiveDRArgs {
     // target meta store details
     TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"),
     TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"),
+    TARGET_HS2_URI_EXTRA_OPTS("targetHiveServer2ExtraOpts", "target HS2 extra opts", false),
 
     TARGET_STAGING_PATH("targetStagingPath", "source staging path for data", false),
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index 05b5f96..492c70e 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -60,12 +60,14 @@ public class EventUtils {
 
     private Configuration conf = null;
     private String sourceHiveServer2Uri = null;
+    private String sourceHS2UriExtraOptions = null;
     private String sourceDatabase = null;
     private String sourceNN = null;
     private String sourceNNKerberosPrincipal = null;
     private String jobNN = null;
     private String jobNNKerberosPrincipal = null;
     private String targetHiveServer2Uri = null;
+    private String targetHS2UriExtraOptions = null;
     private String sourceStagingPath = null;
     private String targetStagingPath = null;
     private String targetNN = null;
@@ -91,6 +93,7 @@ public class EventUtils {
     public EventUtils(Configuration conf) {
         this.conf = conf;
         sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName());
+        sourceHS2UriExtraOptions = conf.get(HiveDRArgs.SOURCE_HS2_URI_EXTRA_OPTS.getName());
         sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
         sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
         sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
@@ -98,6 +101,7 @@ public class EventUtils {
         jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
         jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
         targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
+        targetHS2UriExtraOptions = conf.get(HiveDRArgs.TARGET_HS2_URI_EXTRA_OPTS.getName());
         targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName());
         targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
         targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
@@ -122,22 +126,55 @@ public class EventUtils {
 
         if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
                 .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
-            String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase;
+            String authString = null;
             if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName())))
{
-                connString += authTokenString;
+                authString = authTokenString;
             }
+
+            String connString = getSourceHS2ConnectionUrl(authString);
             sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
             sourceStatement = sourceConnection.createStatement();
         } else {
-            String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase;
+            String authString = null;
             if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName())))
{
-                connString += authTokenString;
+                authString = authTokenString;
             }
+            String connString = getTargetHS2ConnectionUrl(authString);
             targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
             targetStatement = targetConnection.createStatement();
         }
     }
 
+    private String getSourceHS2ConnectionUrl(final String authTokenString) {
+        return getHS2ConnectionUrl(sourceHiveServer2Uri, sourceDatabase,
+                authTokenString, sourceHS2UriExtraOptions);
+    }
+
+    private String getTargetHS2ConnectionUrl(final String authTokenString) {
+        return getHS2ConnectionUrl(targetHiveServer2Uri, sourceDatabase,
+                authTokenString, targetHS2UriExtraOptions);
+    }
+
+    public static String getHS2ConnectionUrl(final String hs2Uri, final String database,
+                                             final String authTokenString, final String hs2UriExtraOpts)
{
+        StringBuilder connString = new StringBuilder();
+        connString.append(JDBC_PREFIX).append(StringUtils.removeEnd(hs2Uri, "/")).append("/").append(database);
+
+        if (StringUtils.isNotBlank(authTokenString)) {
+            connString.append(authTokenString);
+        }
+
+        if (StringUtils.isNotBlank(hs2UriExtraOpts) && !("NA".equalsIgnoreCase(hs2UriExtraOpts)))
{
+            if (!hs2UriExtraOpts.startsWith(";")) {
+                connString.append(";");
+            }
+            connString.append(hs2UriExtraOpts);
+        }
+
+        LOG.info("getHS2ConnectionUrl connection uri: {}", connString);
+        return connString.toString();
+    }
+
     public void initializeFS() throws IOException {
         LOG.info("Initializing staging directory");
         sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString();
@@ -152,7 +189,7 @@ public class EventUtils {
         BufferedReader in = new BufferedReader(new InputStreamReader(jobFileSystem.open(eventFileName)));
         try {
             String line;
-            while ((line=in.readLine())!=null) {
+            while ((line = in.readLine()) != null) {
                 eventString.append(line);
                 eventString.append(DelimiterUtils.NEWLINE_DELIM);
             }
@@ -327,7 +364,7 @@ public class EventUtils {
 
     public DistCpOptions getDistCpOptions() {
         // DistCpOptions expects the first argument to be a file OR a list of Paths
-        List<Path> sourceUris=new ArrayList<>();
+        List<Path> sourceUris = new ArrayList<>();
         sourceUris.add(new Path(sourceStagingUri));
         DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri));
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
index 44f0989..ee459a3 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
@@ -49,8 +49,8 @@ public class HiveDRStatusStore extends DRStatusStore {
     private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class);
     private FileSystem fileSystem;
 
-    private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd
-            (DRStatusStore.BASE_DEFAULT_STORE_PATH,  File.separator) + File.separator
+    private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd(
+            DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator
             + "hiveReplicationStatusStore" + File.separator;
 
     private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
@@ -90,10 +90,10 @@ public class HiveDRStatusStore extends DRStatusStore {
         }
     }
 
-     /**
-        get all DB updated by the job. get all current table statuses for the DB merge the
latest repl
-        status with prev table repl statuses. If all statuses are success, store the status
as success
-        with largest eventId for the DB else store status as failure for the DB and lowest
eventId.
+    /**
+     * get all DB updated by the job. get all current table statuses for the DB merge the
latest repl
+     * status with prev table repl statuses. If all statuses are success, store the status
as success
+     * with largest eventId for the DB else store status as failure for the DB and lowest
eventId.
      */
     @Override
     public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
@@ -161,13 +161,13 @@ public class HiveDRStatusStore extends DRStatusStore {
             }
         } catch (IOException e) {
             throw new HiveReplicationException("Failed to delete status for Job "
-                    + jobName + " and DB "+ database, e);
+                    + jobName + " and DB " + database, e);
         }
 
     }
 
     private DBReplicationStatus getDbReplicationStatus(String source, String target, String
jobName,
-                                                       String database) throws HiveReplicationException{
+                                                       String database) throws HiveReplicationException
{
         DBReplicationStatus dbReplicationStatus = null;
         Path statusDbDirPath = getStatusDbDirPath(database);
         Path statusDirPath = getStatusDirPath(database, jobName);
@@ -253,7 +253,7 @@ public class HiveDRStatusStore extends DRStatusStore {
             while (fileIterator.hasNext()) {
                 fileList.add(fileIterator.next().getPath().toString());
             }
-            if (fileList.size() > (numFiles+1)) {
+            if (fileList.size() > (numFiles + 1)) {
                 // delete some files, as long as they are older than the time.
                 Collections.sort(fileList);
                 for (String file : fileList.subList(0, (fileList.size() - numFiles + 1)))
{
@@ -289,11 +289,11 @@ public class HiveDRStatusStore extends DRStatusStore {
     }
 
     public void checkForReplicationConflict(String newSource, String jobName,
-                                             String database, String table) throws HiveReplicationException
{
+                                            String database, String table) throws HiveReplicationException
{
         try {
             Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator +
"latest.json");
             FileStatus[] files = fileSystem.globStatus(globPath);
-            for(FileStatus file : files) {
+            for (FileStatus file : files) {
                 DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(
                         fileSystem.open(file.getPath())));
                 ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus();
@@ -319,7 +319,7 @@ public class HiveDRStatusStore extends DRStatusStore {
                 allowed as long as the target tables are different. For example, job1 can
replicate db1.table1 and
                 job2 can replicate db1.table2.  Both jobs cannot replicate to same table.
                  */
-                for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet())
{
+                for (Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet())
{
                     if (table.equals(entry.getKey())) {
                         throw new HiveReplicationException("Two different jobs are trying
to replicate to same table "
                                 + entry.getKey() + ". New job = " + jobName

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java
new file mode 100644
index 0000000..2e78519
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for EventUtils.
+ */
+@Test
+public class EventUtilsTest {
+    private static final String JDBC_PREFIX = "jdbc:";
+    private static final String HS2_URI = "hive2://localhost:10000:";
+    private static final String HS2_ZK_URI = "hive2://host1.com:2181,host2.com:2181/";
+    private static final String DATABASE = "test";
+    private static final String HS2_SSL_EXTRA_OPTS = "ssl=true;"
+            + "sslTrustStore=/var/lib/security/keystores/gateway.jks;"
+            + "trustStorePassword=1234?hive.server2.transport.mode=http;hive.server2.thrift.http"
+            + ".path=gateway/primaryCLuster/hive";
+    private static final String HS2_ZK_EXTRA_OPTS = ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2";
+    private static final String AUTHTOKEN_STRING = ";auth=delegationToken";
+    public EventUtilsTest() {
+    }
+
+    @Test
+    public void validateHs2Uri() {
+        final String expectedUri = JDBC_PREFIX + HS2_URI + "/" + DATABASE;
+
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, null),
expectedUri);
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, "NA"),
expectedUri);
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, AUTHTOKEN_STRING,
+                null), expectedUri + AUTHTOKEN_STRING);
+    }
+
+    @Test
+    public void validateHs2UriWhenSSLEnabled() {
+        final String expectedUri = JDBC_PREFIX + HS2_URI + "/" + DATABASE;
+
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, HS2_SSL_EXTRA_OPTS),
+                expectedUri + ";" + HS2_SSL_EXTRA_OPTS);
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, AUTHTOKEN_STRING,
HS2_SSL_EXTRA_OPTS),
+                expectedUri + AUTHTOKEN_STRING + ";" + HS2_SSL_EXTRA_OPTS);
+    }
+
+    @Test
+    public void validateHs2UriWhenZKDiscoveryEnabled() {
+        final String expectedUri = JDBC_PREFIX + HS2_ZK_URI + DATABASE;
+
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_ZK_URI, DATABASE, null, HS2_ZK_EXTRA_OPTS),
+                expectedUri + HS2_ZK_EXTRA_OPTS);
+        Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_ZK_URI, DATABASE, AUTHTOKEN_STRING,
HS2_ZK_EXTRA_OPTS),
+                expectedUri + AUTHTOKEN_STRING + HS2_ZK_EXTRA_OPTS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
index c3bd7a7..6c7e5da 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
@@ -115,14 +115,13 @@ public class HiveMirroringExtension extends AbstractExtension {
     }
 
     @Override
-    public Properties getAdditionalProperties(final Properties extensionProperties) throws
FalconException {
+    public Properties getAdditionalProperties(final Properties extensionProperties)
+        throws FalconException {
         Properties additionalProperties = new Properties();
-
         String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
         // Add job name as Hive DR job
         additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(),
                 jobName);
-
         // Get the first source DB
         additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(),
                 extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES
@@ -235,28 +234,31 @@ public class HiveMirroringExtension extends AbstractExtension {
         if (StringUtils.isBlank(distcpMaxMaps)) {
             additionalProperties.put(HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(),
"1");
         }
-
         String distcpMapBandwidth = extensionProperties.getProperty(
                 HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName());
         if (StringUtils.isBlank(distcpMapBandwidth)) {
             additionalProperties.put(HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(),
"100");
         }
-
         if (StringUtils.isBlank(
                 extensionProperties.getProperty(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName())))
{
             additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(),
"false");
         }
-
         if (StringUtils.isBlank(
                 extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName())))
{
             additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(),
NOT_APPLICABLE);
         }
-
         if (StringUtils.isBlank(
                 extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName())))
{
             additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(),
NOT_APPLICABLE);
         }
-
+        if (StringUtils.isBlank(
+                extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_HS2_EXTRA_OPTS.getName())))
{
+            additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_HS2_EXTRA_OPTS.getName(),
NOT_APPLICABLE);
+        }
+        if (StringUtils.isBlank(
+                extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_HS2_EXTRA_OPTS.getName())))
{
+            additionalProperties.put(HiveMirroringExtensionProperties.TARGET_HS2_EXTRA_OPTS.getName(),
NOT_APPLICABLE);
+        }
         return additionalProperties;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
index 828817b..2276d1c 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
@@ -26,6 +26,7 @@ public enum HiveMirroringExtensionProperties {
     SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"),
     SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false),
     SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
+    SOURCE_HS2_EXTRA_OPTS("sourceHiveServer2ExtraOpts", "Source HS2 extra opts", false),
     SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"),
     SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false),
     SOURCE_TABLES("sourceTables", "List of tables to replicate", false),
@@ -40,6 +41,7 @@ public enum HiveMirroringExtensionProperties {
     TARGET_CLUSTER("targetCluster", "Target cluster name"),
     TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri", false),
     TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"),
+    TARGET_HS2_EXTRA_OPTS("targetHiveServer2ExtraOpts", "Target HS2 extra opts", false),
     TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path", false),
     TARGET_NN("targetNN", "Target name node", false),
     TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos
principal", false),


Mime
View raw message