hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject hive git commit: HIVE-15104: Hive on Spark generate more shuffle data than hive on mr (Rui reviewed by Xuefu)
Date Wed, 25 Oct 2017 03:04:36 GMT
Repository: hive
Updated Branches:
  refs/heads/master 84950cfa6 -> 954f83284


HIVE-15104: Hive on Spark generate more shuffle data than hive on mr (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/954f8328
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/954f8328
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/954f8328

Branch: refs/heads/master
Commit: 954f832849937fb1a95bcba602f24b79cf611413
Parents: 84950cf
Author: Rui Li <lirui@apache.org>
Authored: Wed Oct 25 11:04:28 2017 +0800
Committer: Rui Li <lirui@apache.org>
Committed: Wed Oct 25 11:04:28 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../test/resources/testconfiguration.properties |   3 +-
 kryo-registrator/pom.xml                        |  49 +++++
 .../apache/hive/spark/HiveKryoRegistrator.java  |  72 +++++++
 packaging/pom.xml                               |   5 +
 pom.xml                                         |   1 +
 .../ql/exec/spark/HiveSparkClientFactory.java   |  17 +-
 .../ql/exec/spark/LocalHiveSparkClient.java     |  24 ++-
 .../clientpositive/spark_opt_shuffle_serde.q    |   7 +
 .../spark/spark_opt_shuffle_serde.q.out         | 216 +++++++++++++++++++
 .../hive/spark/client/SparkClientImpl.java      |   7 +
 .../hive/spark/client/SparkClientUtilities.java |  58 ++++-
 12 files changed, 449 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2eaf553..17e5cd5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3434,6 +3434,9 @@ public class HiveConf extends Configuration {
         "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n"
+
         "TableScan operators at the root of operator tree, instead of parent ReduceSink\n"
+
         "operators of the Join operator."),
+    SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false,
+        "If this is set to true, Hive on Spark will register custom serializers for data
types\n" +
+        "in shuffle. This should result in less shuffled data."),
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for requests from Hive client to remote Spark driver."),

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index c338826..9f9b914 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1448,7 +1448,8 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
   spark_vectorized_dynamic_partition_pruning.q,\
   spark_use_ts_stats_for_mapjoin.q,\
   spark_use_op_stats.q,\
-  spark_explain_groupbyshuffle.q
+  spark_explain_groupbyshuffle.q,\
+  spark_opt_shuffle_serde.q
 
 miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   bucket4.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/kryo-registrator/pom.xml
----------------------------------------------------------------------
diff --git a/kryo-registrator/pom.xml b/kryo-registrator/pom.xml
new file mode 100644
index 0000000..a5279fa
--- /dev/null
+++ b/kryo-registrator/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hive</artifactId>
+    <groupId>org.apache.hive</groupId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>hive-kryo-registrator</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Kryo Registrator</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <optional>true</optional>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
new file mode 100644
index 0000000..62ba0eb
--- /dev/null
+++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.spark;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.serializer.KryoRegistrator;
+
+/**
+ * Kryo registrator for shuffle data, i.e. HiveKey and BytesWritable.
+ *
+ * Active use (e.g. reflection to get a class instance) of this class on hive side can cause
+ * problems because kryo is relocated in hive-exec.
+ */
+public class HiveKryoRegistrator implements KryoRegistrator {
+  @Override
+  public void registerClasses(Kryo kryo) {
+    kryo.register(HiveKey.class, new HiveKeySerializer());
+    kryo.register(BytesWritable.class, new BytesWritableSerializer());
+  }
+
+  private static class HiveKeySerializer extends Serializer<HiveKey> {
+
+    public void write(Kryo kryo, Output output, HiveKey object) {
+      output.writeVarInt(object.getLength(), true);
+      output.write(object.getBytes(), 0, object.getLength());
+      output.writeVarInt(object.hashCode(), false);
+    }
+
+    public HiveKey read(Kryo kryo, Input input, Class<HiveKey> type) {
+      int len = input.readVarInt(true);
+      byte[] bytes = new byte[len];
+      input.readBytes(bytes);
+      return new HiveKey(bytes, input.readVarInt(false));
+    }
+  }
+
+  private static class BytesWritableSerializer extends Serializer<BytesWritable> {
+
+    public void write(Kryo kryo, Output output, BytesWritable object) {
+      output.writeVarInt(object.getLength(), true);
+      output.write(object.getBytes(), 0, object.getLength());
+    }
+
+    public BytesWritable read(Kryo kryo, Input input, Class<BytesWritable> type) {
+      int len = input.readVarInt(true);
+      byte[] bytes = new byte[len];
+      input.readBytes(bytes);
+      return new BytesWritable(bytes);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index beddd1c..52ad6a2 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -287,6 +287,11 @@
       <artifactId>hive-webhcat-java-client</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-kryo-registrator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b521cc4..64f4b92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
     <module>llap-server</module>
     <module>shims</module>
     <module>spark-client</module>
+    <module>kryo-registrator</module>
     <module>storage-api</module>
     <module>testutils</module>
     <module>packaging</module>

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 4fcb9bd..bdb0798 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -71,7 +71,7 @@ public class HiveSparkClientFactory {
     String master = sparkConf.get("spark.master");
     if (master.equals("local") || master.startsWith("local[")) {
       // With local spark context, all user sessions share the same spark context.
-      return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf));
+      return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf);
     } else {
       return new RemoteHiveSparkClient(hiveconf, sparkConf);
     }
@@ -208,13 +208,20 @@ public class HiveSparkClientFactory {
       }
     }
 
+    final boolean optShuffleSerDe = hiveConf.getBoolVar(
+        HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE);
+
     Set<String> classes = Sets.newHashSet(
-      Splitter.on(",").trimResults().omitEmptyStrings().split(
-        Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister"))));
+        Splitter.on(",").trimResults().omitEmptyStrings().split(
+            Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister"))));
     classes.add(Writable.class.getName());
     classes.add(VectorizedRowBatch.class.getName());
-    classes.add(BytesWritable.class.getName());
-    classes.add(HiveKey.class.getName());
+    if (!optShuffleSerDe) {
+      classes.add(HiveKey.class.getName());
+      classes.add(BytesWritable.class.getName());
+    } else {
+      sparkConf.put("spark.kryo.registrator", SparkClientUtilities.HIVE_KRYO_REG_NAME);
+    }
     sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
 
     // set yarn queue name

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index 72f2f91..8956a92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -66,9 +70,10 @@ public class LocalHiveSparkClient implements HiveSparkClient {
 
   private static LocalHiveSparkClient client;
 
-  public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) {
+  public static synchronized LocalHiveSparkClient getInstance(
+      SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException
{
     if (client == null) {
-      client = new LocalHiveSparkClient(sparkConf);
+      client = new LocalHiveSparkClient(sparkConf, hiveConf);
     }
     return client;
   }
@@ -81,8 +86,21 @@ public class LocalHiveSparkClient implements HiveSparkClient {
 
   private final JobMetricsListener jobMetricsListener;
 
-  private LocalHiveSparkClient(SparkConf sparkConf) {
+  private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf)
+      throws FileNotFoundException, MalformedURLException {
+    String regJar = null;
+    // the registrator jar should already be in CP when not in test mode
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      String kryoReg = sparkConf.get("spark.kryo.registrator", "");
+      if (SparkClientUtilities.HIVE_KRYO_REG_NAME.equals(kryoReg)) {
+        regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf);
+        SparkClientUtilities.addJarToContextLoader(new File(regJar));
+      }
+    }
     sc = new JavaSparkContext(sparkConf);
+    if (regJar != null) {
+      sc.addJar(regJar);
+    }
     jobMetricsListener = new JobMetricsListener();
     sc.sc().listenerBus().addListener(jobMetricsListener);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q
new file mode 100644
index 0000000..2c4691a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q
@@ -0,0 +1,7 @@
+set hive.spark.optimize.shuffle.serde=true;
+
+set hive.spark.use.groupby.shuffle=true;
+select key, count(*) from src group by key order by key limit 100;
+
+set hive.spark.use.groupby.shuffle=false;
+select key, count(*) from src group by key order by key limit 100;

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out
new file mode 100644
index 0000000..cd9c7bc
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out
@@ -0,0 +1,216 @@
+PREHOOK: query: select key, count(*) from src group by key order by key limit 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select key, count(*) from src group by key order by key limit 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0	3
+10	1
+100	2
+103	2
+104	2
+105	1
+11	1
+111	1
+113	2
+114	1
+116	1
+118	2
+119	3
+12	2
+120	2
+125	2
+126	1
+128	3
+129	2
+131	1
+133	1
+134	2
+136	1
+137	2
+138	4
+143	1
+145	1
+146	2
+149	2
+15	2
+150	1
+152	2
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	2
+165	2
+166	1
+167	3
+168	1
+169	4
+17	1
+170	1
+172	2
+174	2
+175	2
+176	2
+177	1
+178	1
+179	2
+18	2
+180	1
+181	1
+183	1
+186	1
+187	3
+189	1
+19	1
+190	1
+191	2
+192	1
+193	3
+194	1
+195	2
+196	1
+197	2
+199	3
+2	1
+20	1
+200	2
+201	1
+202	1
+203	2
+205	2
+207	2
+208	3
+209	2
+213	2
+214	1
+216	2
+217	2
+218	1
+219	2
+221	2
+222	1
+223	2
+224	2
+226	1
+228	1
+229	2
+230	5
+233	2
+235	1
+237	2
+238	2
+PREHOOK: query: select key, count(*) from src group by key order by key limit 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select key, count(*) from src group by key order by key limit 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0	3
+10	1
+100	2
+103	2
+104	2
+105	1
+11	1
+111	1
+113	2
+114	1
+116	1
+118	2
+119	3
+12	2
+120	2
+125	2
+126	1
+128	3
+129	2
+131	1
+133	1
+134	2
+136	1
+137	2
+138	4
+143	1
+145	1
+146	2
+149	2
+15	2
+150	1
+152	2
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	2
+165	2
+166	1
+167	3
+168	1
+169	4
+17	1
+170	1
+172	2
+174	2
+175	2
+176	2
+177	1
+178	1
+179	2
+18	2
+180	1
+181	1
+183	1
+186	1
+187	3
+189	1
+19	1
+190	1
+191	2
+192	1
+193	3
+194	1
+195	2
+196	1
+197	2
+199	3
+2	1
+20	1
+200	2
+201	1
+202	1
+203	2
+205	2
+207	2
+208	3
+209	2
+213	2
+214	1
+216	2
+217	2
+218	1
+219	2
+221	2
+222	1
+223	2
+224	2
+226	1
+228	1
+229	2
+230	5
+233	2
+235	1
+237	2
+238	2

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index e0ec3b7..f6a23dc 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hive.spark.client;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
@@ -442,6 +443,12 @@ class SparkClientImpl implements SparkClient {
         }
       }
 
+      String regStr = conf.get("spark.kryo.registrator");
+      if (HIVE_KRYO_REG_NAME.equals(regStr)) {
+        argv.add("--jars");
+        argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
+      }
+
       argv.add("--properties-file");
       argv.add(properties.getAbsolutePath());
       argv.add("--class");

http://git-wip-us.apache.org/repos/asf/hive/blob/954f8328/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 210da2a..27bc198 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,12 @@
 
 package org.apache.hive.spark.client;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
@@ -29,17 +32,24 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.MutableURLClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import scala.Option;
 
 public class SparkClientUtilities {
   protected static final transient Logger LOG = LoggerFactory.getLogger(SparkClientUtilities.class);
 
   private static final Map<String, Long> downloadedFiles = new ConcurrentHashMap<>();
 
+  public static final String HIVE_KRYO_REG_NAME = "org.apache.hive.spark.HiveKryoRegistrator";
+  private static final String HIVE_KRYO_REG_JAR_NAME = "hive-kryo-registrator";
+
   /**
    * Add new elements to the classpath.
    *
@@ -74,7 +84,8 @@ public class SparkClientUtilities {
   /**
    * Create a URL from a string representing a path to a local file.
    * The path string can be just a path, or can start with file:/, file:///
-   * @param path  path string
+   *
+   * @param path path string
    * @return
    */
   private static URL urlFromPathString(String path, Long timeStamp,
@@ -136,4 +147,43 @@ public class SparkClientUtilities {
     }
     return null;
   }
+
+  public static String findKryoRegistratorJar(HiveConf conf) throws FileNotFoundException
{
+    // find the jar in local maven repo for testing
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      String repo = System.getProperty("maven.local.repository");
+      String version = System.getProperty("hive.version");
+      String jarName = HIVE_KRYO_REG_JAR_NAME + "-" + version + ".jar";
+      String[] parts = new String[]{repo, "org", "apache", "hive",
+          HIVE_KRYO_REG_JAR_NAME, version, jarName};
+      String jar = Joiner.on(File.separator).join(parts);
+      if (!new File(jar).exists()) {
+        throw new FileNotFoundException(jar + " doesn't exist.");
+      }
+      return jar;
+    }
+    Option<String> option = SparkContext.jarOfClass(SparkClientUtilities.class);
+    if (!option.isDefined()) {
+      throw new FileNotFoundException("Cannot find the path to hive-exec.jar");
+    }
+    File path = new File(option.get());
+    File[] jars = path.getParentFile().listFiles((dir, name) ->
+        name.startsWith(HIVE_KRYO_REG_JAR_NAME));
+    if (jars != null && jars.length > 0) {
+      return jars[0].getAbsolutePath();
+    }
+    throw new FileNotFoundException("Cannot find the " + HIVE_KRYO_REG_JAR_NAME +
+        " jar under " + path.getParent());
+  }
+
+  public static void addJarToContextLoader(File jar) throws MalformedURLException {
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    if (loader instanceof MutableURLClassLoader) {
+      ((MutableURLClassLoader) loader).addURL(jar.toURI().toURL());
+    } else {
+      URLClassLoader newLoader =
+          new URLClassLoader(new URL[]{jar.toURI().toURL()}, loader);
+      Thread.currentThread().setContextClassLoader(newLoader);
+    }
+  }
 }


Mime
View raw message