hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1660640 - in /hive/trunk: ./ hbase-handler/ itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/ ql/src/test/org/apache...
Date Wed, 18 Feb 2015 14:52:15 GMT
Author: xuefu
Date: Wed Feb 18 14:52:15 2015
New Revision: 1660640

URL: http://svn.apache.org/r1660640
Log:
HIVE-9703: Merge from Spark branch to trunk 02/16/2015 (Reviewed by Brock)

Added:
    hive/trunk/ql/src/test/results/clientpositive/spark/cbo_gby_empty.q.out
      - copied unchanged from r1660298, hive/branches/spark/ql/src/test/results/clientpositive/spark/cbo_gby_empty.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/udf_in_file.q.out
      - copied unchanged from r1660298, hive/branches/spark/ql/src/test/results/clientpositive/spark/udf_in_file.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/union_view.q.out
      - copied unchanged from r1660298, hive/branches/spark/ql/src/test/results/clientpositive/spark/union_view.q.out
Modified:
    hive/trunk/   (props changed)
    hive/trunk/hbase-handler/pom.xml   (props changed)
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
    hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java

Propchange: hive/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 18 14:52:15 2015
@@ -1,6 +1,6 @@
 /hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/branch-1.1:1658284,1659437,1659724
 /hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1657401
+/hive/branches/spark:1608589-1660298
 /hive/branches/tez:1494760-1622766
 /hive/branches/vectorization:1466908-1527856

Propchange: hive/trunk/hbase-handler/pom.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 18 14:52:15 2015
@@ -1,6 +1,6 @@
 /hive/branches/branch-0.11/hbase-handler/pom.xml:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo/hbase-handler/pom.xml:1605012-1627125
-/hive/branches/spark/hbase-handler/pom.xml:1608589-1657401
+/hive/branches/spark/hbase-handler/pom.xml:1608589-1660298
 /hive/branches/tez/hbase-handler/pom.xml:1494760-1622766
 /hive/branches/vectorization/hbase-handler/pom.xml:1466908-1527856
 /hive/trunk/hbase-handler/pom.xml:1494760-1537575

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Wed Feb 18 14:52:15
2015
@@ -574,6 +574,7 @@ spark.query.files=add_part_multiple.q, \
   bucketsortoptimize_insert_6.q, \
   bucketsortoptimize_insert_7.q, \
   bucketsortoptimize_insert_8.q, \
+  cbo_gby_empty.q, \
   column_access_stats.q, \
   count.q, \
   create_merge_compressed.q, \
@@ -909,6 +910,7 @@ spark.query.files=add_part_multiple.q, \
   transform_ppr1.q, \
   transform_ppr2.q, \
   udf_example_add.q, \
+  udf_in_file.q, \
   union.q, \
   union10.q, \
   union11.q, \
@@ -955,6 +957,7 @@ spark.query.files=add_part_multiple.q, \
   union_remove_8.q, \
   union_remove_9.q, \
   uniquejoin.q, \
+  union_view.q, \
   varchar_join1.q, \
   vector_between_in.q, \
   vector_cast_constant.q, \

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
Wed Feb 18 14:52:15 2015
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -49,9 +48,9 @@ public abstract class HiveBaseFunctionRe
   private final HiveKVResultCache lastRecordOutput;
   private boolean iteratorAlreadyCreated = false;
 
-  public HiveBaseFunctionResultList(Configuration conf, Iterator<T> inputIterator)
{
+  public HiveBaseFunctionResultList(Iterator<T> inputIterator) {
     this.inputIterator = inputIterator;
-    this.lastRecordOutput = new HiveKVResultCache(conf);
+    this.lastRecordOutput = new HiveKVResultCache();
   }
 
   @Override
@@ -87,8 +86,6 @@ public abstract class HiveBaseFunctionRe
         return true;
       }
 
-      lastRecordOutput.clear();
-
       // Process the records in the input iterator until
       //  - new output records are available for serving downstream operator,
       //  - input records are exhausted or

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java Wed
Feb 18 14:52:15 2015
@@ -17,141 +17,251 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.Reporter;
 
 import scala.Tuple2;
 
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
- * Wrapper around {@link org.apache.hadoop.hive.ql.exec.persistence.RowContainer}
- *
- * This class is thread safe.
+ * A cache with fixed buffer. If the buffer is full, new entries will
+ * be written to disk. This class is thread safe since multiple threads
+ * could access it (doesn't have to be concurrently), for example,
+ * the StreamThread in ScriptOperator.
  */
-@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
-public class HiveKVResultCache {
-
-  public static final int IN_MEMORY_CACHE_SIZE = 512;
-  private static final String COL_NAMES = "key,value";
-  private static final String COL_TYPES =
-      serdeConstants.BINARY_TYPE_NAME + ":" + serdeConstants.BINARY_TYPE_NAME;
-
-  // Used to cache rows added while container is iterated.
-  private RowContainer backupContainer;
-
-  private RowContainer container;
-  private Configuration conf;
-  private int cursor = 0;
-
-  public HiveKVResultCache(Configuration conf) {
-    container = initRowContainer(conf);
-    this.conf = conf;
+@SuppressWarnings("unchecked")
+class HiveKVResultCache {
+  private static final Log LOG = LogFactory.getLog(HiveKVResultCache.class);
+
+  @VisibleForTesting
+  static final int IN_MEMORY_NUM_ROWS = 1024;
+
+  private ObjectPair<HiveKey, BytesWritable>[] writeBuffer;
+  private ObjectPair<HiveKey, BytesWritable>[] readBuffer;
+
+  private File parentFile;
+  private File tmpFile;
+
+  private int readCursor = 0;
+  private int writeCursor = 0;
+
+  // Indicate if the read buffer has data, for example,
+  // when in reading, data on disk could be pull in
+  private boolean readBufferUsed = false;
+  private int rowsInReadBuffer = 0;
+
+  private Input input;
+  private Output output;
+
+  public HiveKVResultCache() {
+    writeBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS];
+    readBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS];
+    for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
+      writeBuffer[i] = new ObjectPair<HiveKey, BytesWritable>();
+      readBuffer[i] = new ObjectPair<HiveKey, BytesWritable>();
+    }
   }
 
-  private static RowContainer initRowContainer(Configuration conf) {
-    RowContainer container;
-    try {
-      container = new RowContainer(IN_MEMORY_CACHE_SIZE, conf, Reporter.NULL);
-
-      String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-      TableDesc tableDesc =
-          PlanUtils.getDefaultQueryOutputTableDesc(COL_NAMES, COL_TYPES, fileFormat);
+  private void switchBufferAndResetCursor() {
+    ObjectPair<HiveKey, BytesWritable>[] tmp = readBuffer;
+    rowsInReadBuffer = writeCursor;
+    readBuffer = writeBuffer;
+    readBufferUsed = true;
+    readCursor = 0;
+    writeBuffer = tmp;
+    writeCursor = 0;
+  }
+
+  private void setupOutput() throws IOException {
+    if (parentFile == null) {
+      while (true) {
+        parentFile = File.createTempFile("hive-resultcache", "");
+        if (parentFile.delete() && parentFile.mkdir()) {
+          parentFile.deleteOnExit();
+          break;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Retry creating tmp result-cache directory...");
+        }
+      }
+    }
 
-      SerDe serDe = (SerDe) tableDesc.getDeserializer();
-      ObjectInspector oi = ObjectInspectorUtils.getStandardObjectInspector(
-          serDe.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+    if (tmpFile == null || input != null) {
+      tmpFile = File.createTempFile("ResultCache", ".tmp", parentFile);
+      LOG.info("ResultCache created temp file " + tmpFile.getAbsolutePath());
+      tmpFile.deleteOnExit();
+    }
 
-      container.setSerDe(serDe, oi);
-      container.setTableDesc(tableDesc);
-    } catch (Exception ex) {
-      throw new RuntimeException("Failed to create RowContainer", ex);
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(tmpFile);
+      output = new Output(fos);
+    } finally {
+      if (output == null && fos != null) {
+        fos.close();
+      }
     }
-    return container;
   }
 
-  public void add(HiveKey key, BytesWritable value) {
-    byte[] hiveKeyBytes = KryoSerializer.serialize(key);
-    BytesWritable wrappedHiveKey = new BytesWritable(hiveKeyBytes);
-    List<BytesWritable> row = new ArrayList<BytesWritable>(2);
-    row.add(wrappedHiveKey);
-    row.add(value);
+  private BytesWritable readValue(Input input) {
+    return new BytesWritable(input.readBytes(input.readInt()));
+  }
 
-    synchronized (this) {
-      try {
-        if (cursor == 0) {
-          container.addRow(row);
-        } else {
-          if (backupContainer == null) {
-            backupContainer = initRowContainer(conf);
+  private void writeValue(Output output, BytesWritable bytesWritable) {
+    int size = bytesWritable.getLength();
+    output.writeInt(size);
+    output.writeBytes(bytesWritable.getBytes(), 0, size);
+  }
+
+  private HiveKey readHiveKey(Input input) {
+    HiveKey hiveKey = new HiveKey(
+      input.readBytes(input.readInt()), input.readInt());
+    hiveKey.setDistKeyLength(input.readInt());
+    return hiveKey;
+  }
+
+  private void writeHiveKey(Output output, HiveKey hiveKey) {
+    int size = hiveKey.getLength();
+    output.writeInt(size);
+    output.writeBytes(hiveKey.getBytes(), 0, size);
+    output.writeInt(hiveKey.hashCode());
+    output.writeInt(hiveKey.getDistKeyLength());
+  }
+
+  public synchronized void add(HiveKey key, BytesWritable value) {
+    if (writeCursor >= IN_MEMORY_NUM_ROWS) { // Write buffer is full
+      if (!readBufferUsed) { // Read buffer isn't used, switch buffer
+        switchBufferAndResetCursor();
+      } else {
+        // Need to spill from write buffer to disk
+        try {
+          if (output == null) {
+            setupOutput();
           }
-          backupContainer.addRow(row);
+          for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
+            ObjectPair<HiveKey, BytesWritable> pair = writeBuffer[i];
+            writeHiveKey(output, pair.getFirst());
+            writeValue(output, pair.getSecond());
+            pair.setFirst(null);
+            pair.setSecond(null);
+          }
+          writeCursor = 0;
+        } catch (Exception e) {
+          clear(); // Clean up the cache
+          throw new RuntimeException("Failed to spill rows to disk", e);
         }
-      } catch (HiveException ex) {
-        throw new RuntimeException("Failed to add KV pair to RowContainer", ex);
       }
     }
+    ObjectPair<HiveKey, BytesWritable> pair = writeBuffer[writeCursor++];
+    pair.setFirst(key);
+    pair.setSecond(value);
   }
 
   public synchronized void clear() {
-    if (cursor == 0) {
-      return;
-    }
-    try {
-      container.clearRows();
-    } catch (HiveException ex) {
-      throw new RuntimeException("Failed to clear rows in RowContainer", ex);
+    writeCursor = readCursor = rowsInReadBuffer = 0;
+    readBufferUsed = false;
+
+    if (parentFile != null) {
+      if (input != null) {
+        try {
+          input.close();
+        } catch (Throwable ignored) {
+        }
+        input = null;
+      }
+      if (output != null) {
+        try {
+          output.close();
+        } catch (Throwable ignored) {
+        }
+        output = null;
+      }
+      try {
+        FileUtil.fullyDelete(parentFile);
+      } catch (Throwable ignored) {
+      }
+      parentFile = null;
+      tmpFile = null;
     }
-    cursor = 0;
   }
 
   public synchronized boolean hasNext() {
-    if (container.rowCount() > 0 && cursor < container.rowCount()) {
-      return true;
-    }
-    if (backupContainer == null
-        || backupContainer.rowCount() == 0) {
-      return false;
-    }
-    clear();
-    // Switch containers
-    RowContainer tmp = container;
-    container = backupContainer;
-    backupContainer = tmp;
-    return true;
+    return readBufferUsed || writeCursor > 0;
   }
 
-  public Tuple2<HiveKey, BytesWritable> next() {
-    try {
-      List<BytesWritable> row;
-      synchronized (this) {
-        Preconditions.checkState(hasNext());
-        if (cursor == 0) {
-          row = container.first();
+  public synchronized Tuple2<HiveKey, BytesWritable> next() {
+    Preconditions.checkState(hasNext());
+    if (!readBufferUsed) {
+      try {
+        if (input == null && output != null) {
+          // Close output stream if open
+          output.close();
+          output = null;
+
+          FileInputStream fis = null;
+          try {
+            fis = new FileInputStream(tmpFile);
+            input = new Input(fis);
+          } finally {
+            if (input == null && fis != null) {
+              fis.close();
+            }
+          }
+        }
+        if (input != null) {
+          // Load next batch from disk
+          for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
+            ObjectPair<HiveKey, BytesWritable> pair = readBuffer[i];
+            pair.setFirst(readHiveKey(input));
+            pair.setSecond(readValue(input));
+          }
+          if (input.eof()) {
+            input.close();
+            input = null;
+          }
+          rowsInReadBuffer = IN_MEMORY_NUM_ROWS;
+          readBufferUsed = true;
+          readCursor = 0;
+        } else if (writeCursor == 1) {
+          ObjectPair<HiveKey, BytesWritable> pair = writeBuffer[0];
+          Tuple2<HiveKey, BytesWritable> row = new Tuple2<HiveKey, BytesWritable>(
+            pair.getFirst(), pair.getSecond());
+          pair.setFirst(null);
+          pair.setSecond(null);
+          writeCursor = 0;
+          return row;
         } else {
-          row = container.next();
+          // No record on disk, more data in write buffer
+          switchBufferAndResetCursor();
         }
-        cursor++;
+      } catch (Exception e) {
+        clear(); // Clean up the cache
+        throw new RuntimeException("Failed to load rows from disk", e);
       }
-      HiveKey key = KryoSerializer.deserialize(row.get(0).getBytes(), HiveKey.class);
-      return new Tuple2<HiveKey, BytesWritable>(key, row.get(1));
-    } catch (HiveException ex) {
-      throw new RuntimeException("Failed to get row from RowContainer", ex);
     }
+    ObjectPair<HiveKey, BytesWritable> pair = readBuffer[readCursor];
+    Tuple2<HiveKey, BytesWritable> row = new Tuple2<HiveKey, BytesWritable>(
+      pair.getFirst(), pair.getSecond());
+    pair.setFirst(null);
+    pair.setSecond(null);
+    if (++readCursor >= rowsInReadBuffer) {
+      readBufferUsed = false;
+      rowsInReadBuffer = 0;
+      readCursor = 0;
+    }
+    return row;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Wed Feb
18 14:52:15 2015
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.util.Iterator;
+
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
@@ -25,8 +27,6 @@ import org.apache.hadoop.io.BytesWritabl
 
 import scala.Tuple2;
 
-import java.util.Iterator;
-
 public class HiveMapFunction extends HivePairFlatMapFunction<
   Iterator<Tuple2<BytesWritable, BytesWritable>>, HiveKey, BytesWritable>
{
 
@@ -51,7 +51,7 @@ public class HiveMapFunction extends Hiv
       mapRecordHandler = new SparkMapRecordHandler();
     }
 
-    HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
+    HiveMapFunctionResultList result = new HiveMapFunctionResultList(it, mapRecordHandler);
     mapRecordHandler.init(jobConf, result, sparkReporter);
 
     return result;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
Wed Feb 18 14:52:15 2015
@@ -17,27 +17,28 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import scala.Tuple2;
-
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.io.BytesWritable;
+
+import scala.Tuple2;
+
 public class HiveMapFunctionResultList extends
     HiveBaseFunctionResultList<Tuple2<BytesWritable, BytesWritable>> {
+  private static final long serialVersionUID = 1L;
   private final SparkRecordHandler recordHandler;
 
   /**
    * Instantiate result set Iterable for Map function output.
    *
-   * @param conf Hive configuration.
    * @param inputIterator Input record iterator.
    * @param handler Initialized {@link SparkMapRecordHandler} instance.
    */
-  public HiveMapFunctionResultList(Configuration conf,
-      Iterator<Tuple2<BytesWritable, BytesWritable>> inputIterator, SparkRecordHandler
handler) {
-    super(conf, inputIterator);
+  public HiveMapFunctionResultList(
+      Iterator<Tuple2<BytesWritable, BytesWritable>> inputIterator,
+      SparkRecordHandler handler) {
+    super(inputIterator);
     recordHandler = handler;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Wed
Feb 18 14:52:15 2015
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.util.Iterator;
+
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 
 import scala.Tuple2;
 
-import java.util.Iterator;
-
 public class HiveReduceFunction extends HivePairFlatMapFunction<
   Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable>
{
 
@@ -42,7 +42,7 @@ public class HiveReduceFunction extends
 
     SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
     HiveReduceFunctionResultList result =
-        new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler);
+        new HiveReduceFunctionResultList(it, reducerRecordhandler);
     reducerRecordhandler.init(jobConf, result, sparkReporter);
 
     return result;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
Wed Feb 18 14:52:15 2015
@@ -17,29 +17,29 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.util.Iterator;
+import scala.Tuple2;
 
 public class HiveReduceFunctionResultList extends
     HiveBaseFunctionResultList<Tuple2<HiveKey, Iterable<BytesWritable>>>
{
+  private static final long serialVersionUID = 1L;
   private final SparkReduceRecordHandler reduceRecordHandler;
 
   /**
    * Instantiate result set Iterable for Reduce function output.
    *
-   * @param conf Hive configuration.
    * @param inputIterator Input record iterator.
    * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
    */
-  public HiveReduceFunctionResultList(Configuration conf,
+  public HiveReduceFunctionResultList(
       Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> inputIterator,
-    SparkReduceRecordHandler reducer) {
-    super(conf, inputIterator);
+      SparkReduceRecordHandler reducer) {
+    super(inputIterator);
     this.reduceRecordHandler = reducer;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java Wed Feb
18 14:52:15 2015
@@ -19,11 +19,11 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Wed Feb 18 14:52:15 2015
@@ -22,8 +22,8 @@ import com.google.common.base.Strings;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.MalformedURLException;
-import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -77,8 +77,8 @@ public class RemoteHiveSparkClient imple
   private transient SparkConf sparkConf;
   private transient HiveConf hiveConf;
 
-  private transient List<URL> localJars = new ArrayList<URL>();
-  private transient List<URL> localFiles = new ArrayList<URL>();
+  private transient List<URI> localJars = new ArrayList<URI>();
+  private transient List<URI> localFiles = new ArrayList<URI>();
 
   private final transient long sparkClientTimtout;
 
@@ -128,7 +128,7 @@ public class RemoteHiveSparkClient imple
     return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);
   }
 
-  private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+  private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException
{
     // add hive-exec jar
     addJars((new JobConf(this.getClass())).getJar());
 
@@ -160,30 +160,32 @@ public class RemoteHiveSparkClient imple
     addResources(addedArchives);
   }
 
-  private void addResources(String addedFiles) {
+  private void addResources(String addedFiles) throws IOException {
     for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
       try {
-        URL fileUrl = SparkUtilities.getURL(addedFile);
-        if (fileUrl != null && !localFiles.contains(fileUrl)) {
-          localFiles.add(fileUrl);
-          remoteClient.addFile(fileUrl);
+        URI fileUri = SparkUtilities.getURI(addedFile);
+        if (fileUri != null && !localFiles.contains(fileUri)) {
+          fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf);
+          localFiles.add(fileUri);
+          remoteClient.addFile(fileUri);
         }
-      } catch (MalformedURLException e) {
-        LOG.warn("Failed to add file:" + addedFile);
+      } catch (URISyntaxException e) {
+        LOG.warn("Failed to add file:" + addedFile, e);
       }
     }
   }
 
-  private void addJars(String addedJars) {
+  private void addJars(String addedJars) throws IOException {
     for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
       try {
-        URL jarUrl = SparkUtilities.getURL(addedJar);
-        if (jarUrl != null && !localJars.contains(jarUrl)) {
-          localJars.add(jarUrl);
-          remoteClient.addJar(jarUrl);
+        URI jarUri = SparkUtilities.getURI(addedJar);
+        if (jarUri != null && !localJars.contains(jarUri)) {
+          jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf);
+          localJars.add(jarUri);
+          remoteClient.addJar(jarUri);
         }
-      } catch (MalformedURLException e) {
-        LOG.warn("Failed to add jar:" + addedJar);
+      } catch (URISyntaxException e) {
+        LOG.warn("Failed to add jar:" + addedJar, e);
       }
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Wed Feb
18 14:52:15 2015
@@ -18,11 +18,15 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -50,25 +54,50 @@ public class SparkUtilities {
     return copy;
   }
 
-  public static URL getURL(String path) throws MalformedURLException {
+  public static URI getURI(String path) throws URISyntaxException {
     if (path == null) {
       return null;
     }
 
-    URL url = null;
-    try {
       URI uri = new URI(path);
-      if (uri.getScheme() != null) {
-        url = uri.toURL();
-      } else {
+      if (uri.getScheme() == null) {
         // if no file schema in path, we assume it's file on local fs.
-        url = new File(path).toURI().toURL();
+        uri = new File(path).toURI();
       }
-    } catch (URISyntaxException e) {
-      // do nothing here, just return null if input path is not a valid URI.
+
+    return uri;
+  }
+
+  /**
+   * Copies local file to HDFS in yarn-cluster mode.
+   *
+   * @param source
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
+    URI result = source;
+    if (conf.get("spark.master").equals("yarn-cluster")) {
+      if (!source.getScheme().equals("hdfs")) {
+        Path tmpDir = SessionState.getHDFSSessionPath(conf);
+        FileSystem fileSystem = FileSystem.get(conf);
+        fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir);
+        String filePath = tmpDir + File.separator + getFileName(source);
+        Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath();
+        result = fullPath.toUri();
+      }
+    }
+    return result;
+  }
+
+  private static String getFileName(URI uri) {
+    if (uri == null) {
+      return null;
     }
 
-    return url;
+    String[] splits = uri.getPath().split(File.separator);
+    return  splits[splits.length-1];
   }
 
   public static SparkSession getSparkSession(HiveConf conf,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Feb 18
14:52:15 2015
@@ -8969,11 +8969,21 @@ public class SemanticAnalyzer extends Ba
       unionoutRR.put(unionalias, field, unionColInfo);
     }
 
-    if (!(leftOp instanceof UnionOperator)) {
+    // For Spark, we rely on the generated SelectOperator to do the type casting.
+    // Consider:
+    //    SEL_1 (int)   SEL_2 (int)    SEL_3 (double)
+    // If we first merge SEL_1 and SEL_2 into a UNION_1, and then merge UNION_1
+    // with SEL_3 to get UNION_2, then no SelectOperator will be inserted. Hence error
+    // will happen afterwards. The solution here is to insert one after UNION_1, which
+    // cast int to double.
+    boolean isSpark = HiveConf.getVar(conf,
+        HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark");
+
+    if (isSpark || !(leftOp instanceof UnionOperator)) {
       leftOp = genInputSelectForUnion(leftOp, leftmap, leftalias, unionoutRR, unionalias);
     }
 
-    if (!(rightOp instanceof UnionOperator)) {
+    if (isSpark || !(rightOp instanceof UnionOperator)) {
       rightOp = genInputSelectForUnion(rightOp, rightmap, rightalias, unionoutRR, unionalias);
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Feb
18 14:52:15 2015
@@ -189,7 +189,15 @@ public class GenSparkUtils {
     throws SemanticException {
 
     List<Operator<?>> roots = new ArrayList<Operator<?>>();
-    roots.addAll(work.getAllRootOperators());
+
+    // For MapWork, getAllRootOperators is not suitable, since it checks
+    // getPathToAliases, and will return null if this is empty. Here we are
+    // replacing getAliasToWork, so should use that information instead.
+    if (work instanceof MapWork) {
+      roots.addAll(((MapWork) work).getAliasToWork().values());
+    } else {
+      roots.addAll(work.getAllRootOperators());
+    }
     if (work.getDummyOps() != null) {
       roots.addAll(work.getDummyOps());
     }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
(original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
Wed Feb 18 14:52:15 2015
@@ -27,8 +27,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.junit.Test;
@@ -42,8 +40,7 @@ public class TestHiveKVResultCache {
   @Test
   public void testSimple() throws Exception {
     // Create KV result cache object, add one (k,v) pair and retrieve them.
-    HiveConf conf = new HiveConf();
-    HiveKVResultCache cache = new HiveKVResultCache(conf);
+    HiveKVResultCache cache = new HiveKVResultCache();
 
     HiveKey key = new HiveKey("key".getBytes(), "key".hashCode());
     BytesWritable value = new BytesWritable("value".getBytes());
@@ -60,10 +57,9 @@ public class TestHiveKVResultCache {
 
   @Test
   public void testSpilling() throws Exception {
-    HiveConf conf = new HiveConf();
-    HiveKVResultCache cache = new HiveKVResultCache(conf);
+    HiveKVResultCache cache = new HiveKVResultCache();
 
-    final int recordCount = HiveKVResultCache.IN_MEMORY_CACHE_SIZE * 3;
+    final int recordCount = HiveKVResultCache.IN_MEMORY_NUM_ROWS * 3;
 
     // Test using the same cache where first n rows are inserted then cache is cleared.
     // Next reuse the same cache and insert another m rows and verify the cache stores correctly.
@@ -104,10 +100,18 @@ public class TestHiveKVResultCache {
   @Test
   public void testResultList() throws Exception {
     scanAndVerify(10000, 0, 0, "a", "b");
+    scanAndVerify(10000, 511, 0, "a", "b");
+    scanAndVerify(10000, 511 * 2, 0, "a", "b");
+    scanAndVerify(10000, 511, 10, "a", "b");
+    scanAndVerify(10000, 511 * 2, 10, "a", "b");
     scanAndVerify(10000, 512, 0, "a", "b");
     scanAndVerify(10000, 512 * 2, 0, "a", "b");
-    scanAndVerify(10000, 512, 10, "a", "b");
-    scanAndVerify(10000, 512 * 2, 10, "a", "b");
+    scanAndVerify(10000, 512, 3, "a", "b");
+    scanAndVerify(10000, 512 * 6, 10, "a", "b");
+    scanAndVerify(10000, 512 * 7, 5, "a", "b");
+    scanAndVerify(10000, 512 * 9, 19, "a", "b");
+    scanAndVerify(10000, 1, 0, "a", "b");
+    scanAndVerify(10000, 1, 1, "a", "b");
   }
 
   private static void scanAndVerify(
@@ -176,8 +180,8 @@ public class TestHiveKVResultCache {
     // A queue to notify separateRowGenerator to generate the next batch of rows.
     private LinkedBlockingQueue<Boolean> queue;
 
-    MyHiveFunctionResultList(Configuration conf, Iterator inputIterator) {
-      super(conf, inputIterator);
+    MyHiveFunctionResultList(Iterator inputIterator) {
+      super(inputIterator);
     }
 
     void init(long rows, int threshold, int separate, String p1, String p2) {
@@ -258,8 +262,7 @@ public class TestHiveKVResultCache {
   private static long scanResultList(long rows, int threshold, int separate,
       List<Tuple2<HiveKey, BytesWritable>> output, String prefix1, String prefix2)
{
     final long iteratorCount = threshold == 0 ? 1 : rows * (100 - separate) / 100 / threshold;
-    MyHiveFunctionResultList resultList = new MyHiveFunctionResultList(
-        new HiveConf(), new Iterator() {
+    MyHiveFunctionResultList resultList = new MyHiveFunctionResultList(new Iterator() {
       // Input record iterator, not used
       private int i = 0;
       @Override

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
(original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
Wed Feb 18 14:52:15 2015
@@ -85,7 +85,8 @@ public class LogDivertAppender extends W
     } else {
       // in non verbose mode, show only select logger messages
       String[] inclLoggerNames = { "org.apache.hadoop.mapreduce.JobSubmitter",
-          "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName() };
+          "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
+          "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"};
       addFilter(new NameFilter(false, inclLoggerNames));
     }
   }

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Wed
Feb 18 14:52:15 2015
@@ -18,7 +18,7 @@
 package org.apache.hive.spark.client;
 
 import java.io.Serializable;
-import java.net.URL;
+import java.net.URI;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -68,10 +68,10 @@ public interface SparkClient extends Ser
    * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
    * on that node (and not on the client machine).
    *
-   * @param url The location of the jar file.
+   * @param uri The location of the jar file.
    * @return A future that can be used to monitor the operation.
    */
-  Future<?> addJar(URL url);
+  Future<?> addJar(URI uri);
 
   /**
    * Adds a file to the running remote context.
@@ -80,10 +80,10 @@ public interface SparkClient extends Ser
    * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
    * on that node (and not on the client machine).
    *
-   * @param url The location of the file.
+   * @param uri The location of the file.
    * @return A future that can be used to monitor the operation.
    */
-  Future<?> addFile(URL url);
+  Future<?> addFile(URI uri);
 
   /**
    * Get the count of executors.

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
(original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
Wed Feb 18 14:52:15 2015
@@ -17,6 +17,14 @@
 
 package org.apache.hive.spark.client;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
@@ -30,14 +38,12 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.io.Writer;
-import java.net.URL;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,14 +55,6 @@ import org.apache.spark.SparkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 class SparkClientImpl implements SparkClient {
   private static final long serialVersionUID = 1L;
 
@@ -155,13 +153,13 @@ class SparkClientImpl implements SparkCl
   }
 
   @Override
-  public Future<?> addJar(URL url) {
-    return run(new AddJarJob(url.toString()));
+  public Future<?> addJar(URI uri) {
+    return run(new AddJarJob(uri.toString()));
   }
 
   @Override
-  public Future<?> addFile(URL url) {
-    return run(new AddFileJob(url.toString()));
+  public Future<?> addFile(URI uri) {
+    return run(new AddFileJob(uri.toString()));
   }
 
   @Override

Modified: hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1660640&r1=1660639&r2=1660640&view=diff
==============================================================================
--- hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
(original)
+++ hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
Wed Feb 18 14:52:15 2015
@@ -22,7 +22,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.Serializable;
-import java.net.URL;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -204,7 +204,7 @@ public class TestSparkClient {
           jarFile.closeEntry();
           jarFile.close();
 
-          client.addJar(new URL("file:" + jar.getAbsolutePath()))
+          client.addJar(new URI("file:" + jar.getAbsolutePath()))
             .get(TIMEOUT, TimeUnit.SECONDS);
 
           // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
@@ -220,7 +220,7 @@ public class TestSparkClient {
           fileStream.write("test file".getBytes("UTF-8"));
           fileStream.close();
 
-          client.addJar(new URL("file:" + file.getAbsolutePath()))
+          client.addJar(new URI("file:" + file.getAbsolutePath()))
             .get(TIMEOUT, TimeUnit.SECONDS);
 
           // The same applies to files added with "addFile". They're only guaranteed to be
available



Mime
View raw message