hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject hive git commit: HIVE-13133: Create initial InputFormat + record readers/writers (Gunther Hagleitner)
Date Wed, 24 Feb 2016 02:55:39 GMT
Repository: hive
Updated Branches:
  refs/heads/llap bc8de94ae -> bf834079a


HIVE-13133: Create initial InputFormat + record readers/writers (Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bf834079
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bf834079
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bf834079

Branch: refs/heads/llap
Commit: bf834079a3491bdcc65e1b839591f9db7098cf3b
Parents: bc8de94
Author: Gunther Hagleitner <gunther@apache.org>
Authored: Tue Feb 23 17:45:10 2016 -0800
Committer: Gunther Hagleitner <gunther@apache.org>
Committed: Tue Feb 23 18:53:46 2016 -0800

----------------------------------------------------------------------
 bin/ext/llapdump.sh                             |  31 +++
 bin/hive                                        |   4 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../test/resources/testconfiguration.properties |   1 +
 jdbc/pom.xml                                    |  11 +-
 .../src/java/org/apache/hive/jdbc/LlapDump.java | 136 ++++++++++
 .../org/apache/hive/jdbc/LlapInputFormat.java   | 174 ++++++++++++
 .../hadoop/hive/llap/LlapDataOutputBuffer.java  | 165 ++++++++++++
 .../hadoop/hive/llap/LlapInputFormat.java       | 249 +++++++++++++++++
 .../apache/hadoop/hive/llap/LlapInputSplit.java | 143 ++++++++++
 .../hadoop/hive/llap/LlapOutputFormat.java      |  60 +++++
 .../hive/llap/LlapOutputFormatService.java      | 141 ++++++++++
 .../hadoop/hive/llap/LlapRecordReader.java      |  86 ++++++
 .../hadoop/hive/llap/LlapRecordWriter.java      |  52 ++++
 .../hadoop/hive/ql/exec/FunctionRegistry.java   |   2 +
 .../hive/ql/exec/SerializationUtilities.java    |   2 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  21 +-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |   2 +-
 .../hive/ql/optimizer/SimpleFetchOptimizer.java |  50 ++--
 .../ql/udf/generic/GenericUDFGetSplits.java     | 265 +++++++++++++++++++
 .../hadoop/hive/llap/TestLlapOutputFormat.java  | 124 +++++++++
 .../queries/clientpositive/udf_get_splits.q     |   6 +
 .../clientpositive/tez/udf_get_splits.q.out     |  73 +++++
 23 files changed, 1758 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/bin/ext/llapdump.sh
----------------------------------------------------------------------
diff --git a/bin/ext/llapdump.sh b/bin/ext/llapdump.sh
new file mode 100644
index 0000000..2564e82
--- /dev/null
+++ b/bin/ext/llapdump.sh
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+THISSERVICE=llapdump
+export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
+
+llapdump () {
+  CLASS=org.apache.hive.jdbc.LlapDump
+  HIVE_OPTS=''
+  execHiveCmd $CLASS "$@"
+}
+
+llapdump_help () {
+  echo "usage ./hive llapdump [-l <url>] [-u <user>] [-p <pwd>] <query>"
+  echo ""
+  echo "  --location (-l)  hs2 url"
+  echo "  --user (-u)      user name"
+  echo "  --pwd (-p)       password"
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/bin/hive
----------------------------------------------------------------------
diff --git a/bin/hive b/bin/hive
index e9477f7..e6693f6 100755
--- a/bin/hive
+++ b/bin/hive
@@ -48,6 +48,10 @@ while [ $# -gt 0 ]; do
       SERVICE=orcfiledump
       shift
       ;;
+    --llapdump)
+      SERVICE=llapdump
+      shift
+      ;;
     --help)
       HELP=_help
       shift

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9cb626e..7fbcbba 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2684,6 +2684,8 @@ public class HiveConf extends Configuration {
         false,
         "Whether to setup split locations to match nodes on which llap daemons are running," +
             " instead of using the locations provided by the split itself"),
+    LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
+        "LLAP daemon output service port"),
 
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 58d0a45..deb9905 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -435,6 +435,7 @@ minitez.query.files=bucket_map_join_tez1.q,\
   tez_smb_main.q,\
   tez_smb_1.q,\
   tez_smb_empty.q,\
+  udf_get_splits.q,\
   vector_join_part_col_char.q,\
   vectorized_dynamic_partition_pruning.q,\
   tez_multi_union.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index f87ab59..2be8c30 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -42,14 +42,13 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-service</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
new file mode 100644
index 0000000..b0c0253
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.LlapRecordReader;
+import org.apache.hadoop.hive.metastore.api.Schema;
+
+public class LlapDump {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class);
+
+  private static String url = "jdbc:hive2://localhost:10000/default";
+  private static String user = "hive";
+  private static String pwd = "";
+  private static String query = "select * from test";
+
+  public static void main(String[] args) throws Exception {
+    Options opts = createOptions();
+    CommandLine cli = new GnuParser().parse(opts, args);
+
+    if (cli.hasOption('h')) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("orcfiledump", opts);
+      return;
+    }
+
+    if (cli.hasOption('l')) {
+      url = cli.getOptionValue("l");
+    }
+
+    if (cli.hasOption('u')) {
+      user = cli.getOptionValue("u");
+    }
+
+    if (cli.hasOption('p')) {
+      pwd = cli.getOptionValue("p");
+    }
+
+    if (cli.getArgs().length > 0) {
+      query = cli.getArgs()[0];
+    }
+
+    System.out.println("url: "+url);
+    System.out.println("user: "+user);
+    System.out.println("query: "+query);
+
+    LlapInputFormat format = new LlapInputFormat(url, user, pwd, query);
+    JobConf job = new JobConf();
+    InputSplit[] splits = format.getSplits(job, 1);
+    RecordReader<NullWritable, Text> reader = format.getRecordReader(splits[0], job, null);
+
+    if (reader instanceof LlapRecordReader) {
+      Schema schema = ((LlapRecordReader)reader).getSchema();
+      System.out.println(""+schema);
+    }
+    System.out.println("Results: ");
+    System.out.println("");
+
+    Text value = reader.createValue();
+    while (reader.next(NullWritable.get(), value)) {
+      System.out.println(value);
+    }
+  }
+
+  static Options createOptions() {
+    Options result = new Options();
+
+    result.addOption(OptionBuilder
+        .withLongOpt("location")
+        .withDescription("HS2 url")
+	.hasArg()
+        .create('l'));
+
+    result.addOption(OptionBuilder
+        .withLongOpt("user")
+        .withDescription("user name")
+        .hasArg()
+        .create('u'));
+
+    result.addOption(OptionBuilder
+        .withLongOpt("pwd")
+        .withDescription("password")
+        .hasArg()
+        .create('p'));
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
new file mode 100644
index 0000000..97fe2c5
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.DriverManager;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.base.Preconditions;
+
+public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+  private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+  private String url;  // "jdbc:hive2://localhost:10000/default"
+  private String user; // "hive",
+  private String pwd;  // ""
+  private String query;
+
+  private Connection con;
+  private Statement stmt;
+
+  public LlapInputFormat(String url, String user, String pwd, String query) {
+    this.url = url;
+    this.user = user;
+    this.pwd = pwd;
+    this.query = query;
+  }
+
+  public LlapInputFormat() {}
+
+  public class LlapInputSplit implements InputSplitWithLocationInfo {
+    InputSplitWithLocationInfo nativeSplit;
+    String inputFormatClassName;
+
+    @Override
+    public long getLength() throws IOException {
+      return nativeSplit.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return nativeSplit.getLocations();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(inputFormatClassName);
+      out.writeUTF(nativeSplit.getClass().toString());
+      nativeSplit.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      inputFormatClassName = in.readUTF();
+      String splitClass = in.readUTF();
+      try {
+        nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      nativeSplit.readFields(in);
+    }
+
+    @Override
+    public SplitLocationInfo[] getLocationInfo() throws IOException {
+      return nativeSplit.getLocationInfo();
+    }
+
+    public InputSplit getSplit() {
+      return nativeSplit;
+    }
+
+    public InputFormat<NullWritable, V> getInputFormat() {
+      try {
+        return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName)
+            .newInstance();
+      } catch(Exception e) {
+        return null;
+      }
+    }
+  }
+
+  @Override
+  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    LlapInputSplit llapSplit = (LlapInputSplit)split;
+    return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<InputSplit> ins = new ArrayList<InputSplit>();
+
+    if (url == null || query == null) {
+      throw new IllegalStateException();
+    }
+
+    try {
+      Class.forName(driverName);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+
+    try {
+      con = DriverManager.getConnection(url,user,pwd);
+      stmt = con.createStatement();
+      String sql = "select r.if_class as ic, r.split_class as sc, r.split as s from (select explode(get_splits(\""+query+"\","+numSplits+")) as r) t";
+      ResultSet res = stmt.executeQuery(sql);
+      while (res.next()) {
+        // deserialize split
+        DataInput in = new DataInputStream(new ByteArrayInputStream(res.getBytes(3)));
+        InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor
+        is.readFields(in);
+        ins.add(is);
+      }
+
+      res.close();
+      stmt.close();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return ins.toArray(new InputSplit[ins.size()]); // todo wrap input split with format
+  }
+
+  public void close() {
+    try {
+      con.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
new file mode 100644
index 0000000..aad8968
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+/**
+ * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class LlapDataOutputBuffer implements DataOutput {
+
+  int readOffset;
+  int writeOffset;
+  byte[] buffer;
+
+  /** Constructs a new empty buffer. */
+  public LlapDataOutputBuffer(int length) {
+    buffer = new byte[length];
+    reset();
+  }
+
+  /**
+   * Returns the current contents of the buffer. Data is only valid to
+   * {@link #getLength()}.
+   */
+  public byte[] getData() {
+    return buffer;
+  }
+
+  /** Returns the length of the valid data currently in the buffer. */
+  public int getLength() {
+    return (writeOffset - readOffset) % buffer.length;
+  }
+
+  /** Resets the buffer to empty. */
+  public LlapDataOutputBuffer reset() {
+    readOffset = 0;
+    writeOffset = 0;
+    return this;
+  }
+
+  /** Writes bytes from a DataInput directly into the buffer. */
+  public void write(DataInput in, int length) throws IOException {
+    //
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    while (readOffset == writeOffset) {
+      try {
+	wait();
+      } catch(InterruptedException e) {
+      }
+    }
+    buffer[writeOffset] = (byte)b;
+    writeOffset = (writeOffset + 1) % buffer.length;
+    notify();
+  }
+
+  public synchronized int read() throws IOException {
+    while (readOffset == writeOffset) {
+      try {
+	wait();
+      } catch(InterruptedException e) {
+      }
+    }
+    int b = buffer[readOffset];
+    readOffset = (readOffset + 1) % buffer.length;
+    notify();
+    return b;
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    while(len-- != 0) {
+      write(b[off++]);
+    }
+  }
+
+  @Override
+  public void write(byte b[]) throws IOException {
+    write(b, 0, b.length);
+  }
+
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    write(v?1:0);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException  {
+    write(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException  {
+    write(v);
+  }
+
+  @Override
+  public void writeBytes(String v) throws IOException  {
+    write(v.getBytes(), 0, v.length());
+  }
+
+  @Override
+  public void writeChars(String v) throws IOException  {
+    write(v.getBytes(), 0, v.length());
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException  {
+    write(ByteBuffer.allocate(8).putDouble(v).array(),0,8);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException  {
+    write(ByteBuffer.allocate(4).putFloat(v).array(),0,4);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException  {
+    write(v);
+    write(v>>>8);
+    write(v>>>16);
+    write(v>>>24);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException  {
+    int v1 = (int)v;
+    int v2 = (int)v>>>32;
+    write(v1);
+    write(v2);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException  {
+    write(v);
+    write(v>>>8);
+  }
+
+  @Override
+  public void writeUTF(String v) throws IOException  {
+    write(v.getBytes(), 0, v.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
new file mode 100644
index 0000000..4db4d32
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.security.auth.login.LoginException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.Socket;
+
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+
+import com.esotericsoftware.kryo.Kryo;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.InputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.io.FileNotFoundException;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+
+import com.google.common.base.Preconditions;
+
+public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
+
+  private TezWork work;
+  private Schema schema;
+
+  public LlapInputFormat(TezWork tezWork, Schema schema) {
+    this.work = tezWork;
+    this.schema = schema;
+  }
+
+  // need empty constructor for bean instantiation
+  public LlapInputFormat() {}
+
+  /*
+   * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
+   * off the work in the split to LLAP and finally return the connected socket back in an
+   * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
+   */
+  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+    LlapInputSplit llapSplit = (LlapInputSplit)split;
+
+    // TODO: push event into LLAP
+
+    // this is just the portion that sets up the io to receive data
+    String host = split.getLocations()[0];
+    String id = job.get(LlapOutputFormat.LLAP_OF_ID_KEY);
+
+    HiveConf conf = new HiveConf();
+    Socket socket = new Socket(host,
+        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+
+    LOG.debug("Socket connected");
+
+    socket.getOutputStream().write(id.getBytes());
+    socket.getOutputStream().write(0);
+    socket.getOutputStream().flush();
+
+    LOG.debug("Registered id: " + id);
+
+    return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+  }
+
+  /*
+   * getSplits() gets called as part of the GenericUDFGetSplits call to get splits. Here we create
+   * an array of input splits from the work item we have, figure out the location for llap and pass
+   * that back for the submission. getRecordReader method above uses that split info to assign the
+   * work to llap.
+   */
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    // TODO: need to build proto of plan
+
+    DAG dag = DAG.create(work.getName());
+    dag.setCredentials(job.getCredentials());
+    // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
+
+    DagUtils utils = DagUtils.getInstance();
+    Context ctx = new Context(job);
+    MapWork mapWork = (MapWork) work.getAllWork().get(0);
+    // bunch of things get setup in the context based on conf but we need only the MR tmp directory
+    // for the following method.
+    JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
+    Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
+    FileSystem fs = scratchDir.getFileSystem(job);
+    try {
+      LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
+          new ArrayList<LocalResource>(), fs, ctx, false, work,
+          work.getVertexType(mapWork));
+      dag.addVertex(wx);
+      utils.addCredentials(mapWork, dag);
+
+      // we have the dag now proceed to get the splits:
+      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
+      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+      List<Event> eventList = splitGenerator.initialize();
+
+      // hack - just serializing with kryo for now. This needs to be done properly
+      InputSplit[] result = new InputSplit[eventList.size()];
+      int i = 0;
+      ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
+
+      InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent)
+	eventList.remove(0);
+
+      List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
+      for (Event event: eventList) {
+	TaskLocationHint hint = hints.remove(0);
+        Set<String> hosts = hint.getHosts();
+	SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
+
+	int j = 0;
+	for (String host: hosts) {
+	  locations[j++] = new SplitLocationInfo(host,false);
+	}
+
+	bos.reset();
+	Kryo kryo = SerializationUtilities.borrowKryo();
+	SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
+	SerializationUtilities.releaseKryo(kryo);
+	result[i++] = new LlapInputSplit(bos.toByteArray(), locations, schema);
+      }
+      return result;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Returns a local resource representing a jar. This resource will be used to execute the plan on
+   * the cluster.
+   *
+   * @param localJarPath
+   *          Local path to the jar to be localized.
+   * @return LocalResource corresponding to the localized hive exec resource.
+   * @throws IOException
+   *           when any file system related call fails.
+   * @throws LoginException
+   *           when we are unable to determine the user.
+   * @throws URISyntaxException
+   *           when current jar location cannot be determined.
+   */
+  private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
+      Configuration conf)
+    throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
+    FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
+    assert destDirStatus != null;
+    Path destDirPath = destDirStatus.getPath();
+
+    Path localFile = new Path(localJarPath);
+    String sha = getSha(localFile, conf);
+
+    String destFileName = localFile.getName();
+
+    // Now, try to find the file based on SHA and name. Currently we require exact name match.
+    // We could also allow cutting off versions and other stuff provided that SHA matches...
+    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+      + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
+
+    // TODO: if this method is ever called on more than one jar, getting the dir and the
+    // list need to be refactored out to be done only once.
+    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
+  }
+
+  private String getSha(Path localFile, Configuration conf)
+    throws IOException, IllegalArgumentException {
+    InputStream is = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      is = localFs.open(localFile);
+      return DigestUtils.sha256Hex(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
new file mode 100644
index 0000000..78dbb34
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.transport.AutoExpandingBuffer;
+
+import com.google.common.base.Preconditions;
+
+public class LlapInputSplit implements InputSplitWithLocationInfo {
+
+  byte[] queryFragment;
+  SplitLocationInfo[] locations;
+  Schema schema;
+
+  public LlapInputSplit() {}
+
+  public LlapInputSplit(byte[] queryFragment, SplitLocationInfo[] locations, Schema schema) {
+    this.queryFragment = queryFragment;
+    this.locations = locations;
+    this.schema = schema;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    String[] locs = new String[locations.length];
+    for (int i = 0; i < locations.length; ++i) {
+      locs[i] = locations[i].getLocation();
+    }
+    return locs;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(queryFragment.length);
+    out.write(queryFragment);
+
+    out.writeInt(locations.length);
+    for (int i = 0; i < locations.length; ++i) {
+      out.writeUTF(locations[i].getLocation());
+    }
+
+    byte[] binarySchema;
+
+    try {
+      AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d);
+      TProtocol protocol = new TBinaryProtocol(transport);
+      schema.write(protocol);
+      binarySchema = transport.getBuf().array();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    out.writeInt(binarySchema.length);
+    out.write(binarySchema);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] queryFragment;
+
+    int length = in.readInt();
+    queryFragment = new byte[length];
+    in.readFully(queryFragment);
+
+    length = in.readInt();
+    locations = new SplitLocationInfo[length];
+
+    for (int i = 0; i < length; ++i) {
+      locations[i] = new SplitLocationInfo(in.readUTF(), false);
+    }
+
+    length = in.readInt();
+
+    try {
+      AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(length, 2d);
+      AutoExpandingBuffer buf = transport.getBuf();
+      in.readFully(buf.array(), 0, length);
+
+      TProtocol protocol = new TBinaryProtocol(transport);
+      schema = new Schema();
+      schema.read(protocol);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return locations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
new file mode 100644
index 0000000..8e98aba
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class LlapOutputFormat<K extends Writable, V extends Writable>
+  implements OutputFormat<K, V> {
+
+  public static final String LLAP_OF_ID_KEY = "llap.of.id";
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
+    if (!LlapProxy.isDaemon()) {
+      throw new IOException("LlapOutputFormat can only be used inside Llap");
+    }
+    try {
+      return LlapOutputFormatService.get().<K,V>getWriter(job.get(LLAP_OF_ID_KEY));
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
new file mode 100644
index 0000000..4f38ff1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ *
+ */
+public class LlapOutputFormatService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
+
+  private static LlapOutputFormatService service;
+  private final Map<String, RecordWriter> writers;
+  private final ServerSocket socket;
+  private final HiveConf conf;
+  private final ExecutorService executor;
+  private static final int WAIT_TIME = 5;
+
+  private LlapOutputFormatService() throws IOException {
+    writers = new HashMap<String, RecordWriter>();
+    conf = new HiveConf();
+    executor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LLAP output %d").build());
+    socket = new ServerSocket(
+      conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+  }
+
+  public static LlapOutputFormatService get() throws IOException {
+    if (service == null) {
+      service = new LlapOutputFormatService();
+      service.start();
+    }
+    return service;
+  }
+
+  public void start() throws IOException {
+    executor.submit(new Runnable() {
+      byte[] buffer = new byte[4096];
+      @Override
+      public void run() {
+	while (true) {
+	  Socket s = null;
+	  try {
+	    s = socket.accept();
+	    String id = readId(s);
+	    LOG.debug("Received: "+id);
+	    registerReader(s, id);
+	  } catch (IOException io) {
+	    if (s != null) {
+	      try{
+		s.close();
+	      } catch (IOException io2) {
+		// ignore
+	      }
+	    }
+	  }
+	}
+      }
+
+    private String readId(Socket s) throws IOException {
+      InputStream in = s.getInputStream();
+      int idx = 0;
+      while((buffer[idx++] = (byte)in.read()) != '\0') {}
+      return new String(buffer,0,idx-1);
+    }
+
+    private void registerReader(Socket s, String id) throws IOException {
+      synchronized(service) {
+	LOG.debug("registering socket for: "+id);
+	LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
+        writers.put(id, writer);
+        service.notifyAll();
+      }
+    }
+    }
+    );
+  }
+
+  public void stop() throws IOException, InterruptedException {
+    executor.shutdown();
+    executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+    socket.close();
+  }
+
+  public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
+    RecordWriter writer = null;
+    synchronized(service) {
+      while ((writer = writers.get(id)) == null) {
+	LOG.debug("Waiting for writer for: "+id);
+	service.wait();
+      }
+    }
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
new file mode 100644
index 0000000..ce3d39a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.metastore.api.Schema;
+
+public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+
+  DataInputStream din;
+  Schema schema;
+  Class<V> clazz;
+
+  public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) {
+    din = new DataInputStream(in);
+    this.schema = schema;
+    this.clazz = clazz;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void close() throws IOException {
+    din.close();
+  }
+
+  @Override
+  public long getPos() { return 0; }
+
+  @Override
+  public float getProgress() { return 0f; }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V createValue() {
+    try {
+      return clazz.newInstance();
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean next(NullWritable key, V value) {
+    try {
+      value.readFields(din);
+      return true;
+    } catch (IOException io) {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
new file mode 100644
index 0000000..4d1996c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.DataOutputStream;;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
+  implements RecordWriter<K,V> {
+
+  DataOutputStream dos;
+
+  public LlapRecordWriter(OutputStream out) {
+    dos = new DataOutputStream(out);
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    dos.close();
+  }
+
+  @Override
+  public void write(K key, V value) throws IOException {
+    value.write(dos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index e0e030f..f3afa24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -344,6 +344,8 @@ public final class FunctionRegistry {
     system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class);
     system.registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class);
 
+    system.registerGenericUDF("get_splits", GenericUDFGetSplits.class);
+
     // Aliases for Java Class Names
     // These are used in getImplicitConvertUDFMethod
     system.registerUDF(serdeConstants.BOOLEAN_TYPE_NAME, UDFToBoolean.class, false, UDFToBoolean.class.getSimpleName());

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index b05a79e..eaa4293 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -570,7 +570,7 @@ public class SerializationUtilities {
    * @param plan Usually of type MapredWork, MapredLocalWork etc.
    * @param out stream in which serialized plan is written into
    */
-  private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
+  public static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) {
     Output output = new Output(out);
     kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
     kryo.writeObject(output, plan);

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 8e48c2e..b0cda82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -74,17 +74,26 @@ public class HiveSplitGenerator extends InputInitializer {
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class);
 
-  private final DynamicPartitionPruner pruner;
-  private final Configuration conf;
-  private final JobConf jobConf;
-  private final MRInputUserPayloadProto userPayloadProto;
-  private final MapWork work;
+  private DynamicPartitionPruner pruner = null;
+  private Configuration conf = null;
+  private JobConf jobConf = null;
+  private MRInputUserPayloadProto userPayloadProto = null;
+  private MapWork work = null;
   private final SplitGrouper splitGrouper = new SplitGrouper();
-  private final SplitLocationProvider splitLocationProvider;
+  private SplitLocationProvider splitLocationProvider = null;
+
+  public void initializeSplitGenerator(Configuration conf, MapWork work) {
+    this.conf = conf;
+    this.work = work;
+    this.jobConf = new JobConf(conf);
+  }
 
   public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
       SerDeException {
     super(initializerContext);
+    if (initializerContext == null) {
+      return;
+    }
     Preconditions.checkNotNull(initializerContext);
     userPayloadProto =
         MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 83defea..9e688ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -412,7 +412,7 @@ public class TezTask extends Task<TezWork> {
     return dag;
   }
 
-  private void setAccessControlsForCurrentUser(DAG dag) {
+  public static void setAccessControlsForCurrentUser(DAG dag) {
     // get current user
     String currentUser = SessionState.getUserFromAuthenticator();
     if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index b5ceb14..ca8dccf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -140,35 +141,31 @@ public class SimpleFetchOptimizer extends Transform {
   }
 
   private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) throws Exception {
+    boolean result = false;
+
     if (limit > 0) {
       if (data.hasOnlyPruningFilter()) {
         /* partitioned table + query has only pruning filters */
-        return true;
+        result = true;
       } else if (data.isPartitioned() == false && data.isFiltered() == false) {
         /* unpartitioned table + no filters */
-        return true;
+        result = true;
       }
       /* fall through */
-    }
-    long threshold = HiveConf.getLongVar(pctx.getConf(),
-        HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
-    if (threshold < 0) {
-      return true;
-    }
-    Operator child = data.scanOp.getChildOperators().get(0);
-    if(child instanceof SelectOperator) {
-      // select *, constant and casts can be allowed without a threshold check
-      if (checkExpressions((SelectOperator)child)) {
-        return true;
+    } else {
+      long threshold = HiveConf.getLongVar(pctx.getConf(),
+	  HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
+      if (threshold < 0) {
+	result = true;
+      } else {
+	long remaining = threshold;
+	remaining -= data.getInputLength(pctx, remaining);
+	if (remaining >= 0) {
+	  result = true;
+	}
       }
     }
-    long remaining = threshold;
-    remaining -= data.getInputLength(pctx, remaining);
-    if (remaining < 0) {
-      LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode");
-      return false;
-    }
-    return true;
+    return result;
   }
 
   // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
@@ -187,23 +184,20 @@ public class SimpleFetchOptimizer extends Transform {
       return null;
     }
     Table table = ts.getConf().getTableMetadata();
-    if (table == null) {
-      return null;
-    }
     ReadEntity parent = PlanUtils.getParentViewInfo(alias, pctx.getViewAliasToInput());
-    if (!table.isPartitioned()) {
+    if (table != null && !table.isPartitioned()) {
       FetchData fetch = new FetchData(ts, parent, table, splitSample);
       return checkOperators(fetch, aggressive, false);
     }
 
     boolean bypassFilter = false;
-    if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) {
+    if (table != null && HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) {
       ExprNodeDesc pruner = pctx.getOpToPartPruner().get(ts);
       if (PartitionPruner.onlyContainsPartnCols(table, pruner)) {
         bypassFilter = !pctx.getPrunedPartitions(alias, ts).hasUnknownPartitions();
       }
     }
-    if (!aggressive && !bypassFilter) {
+    if (table != null && !aggressive && !bypassFilter) {
       return null;
     }
     PrunedPartitionList partitions = pctx.getPrunedPartitions(alias, ts);
@@ -231,7 +225,7 @@ public class SimpleFetchOptimizer extends Transform {
         continue;
       }
 
-      if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
+      if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)) || op instanceof UDTFOperator) {
         break;
       }
 
@@ -289,7 +283,7 @@ public class SimpleFetchOptimizer extends Transform {
 
   private boolean isConvertible(FetchData fetch, Operator<?> operator, Set<Operator<?>> traversed) {
     if (operator instanceof ReduceSinkOperator || operator instanceof CommonJoinOperator
-        || operator instanceof ScriptOperator || operator instanceof UDTFOperator) {
+        || operator instanceof ScriptOperator) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
new file mode 100644
index 0000000..3b7dcd9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.io.Serializable;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.DataOutput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.llap.LlapInputFormat;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.metastore.api.Schema;
+
+
+/**
+ * GenericUDFGetSplits.
+ *
+ */
+@Description(name = "get_splits", value = "_FUNC_(string,int) - "
+    + "Returns an array of length int serialized splits for the referenced tables string.")
+@UDFType(deterministic = false)
+public class GenericUDFGetSplits extends GenericUDF {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class);
+
+  private transient StringObjectInspector stringOI;
+  private transient IntObjectInspector intOI;
+  private final ArrayList<Object> retArray = new ArrayList<Object>();
+  private transient JobConf jc;
+  private transient Hive db;
+  private ByteArrayOutputStream bos;
+  private DataOutput dos;
+
+  @Override
+  public ObjectInspector initialize(ObjectInspector[] arguments)
+    throws UDFArgumentException {
+
+    LOG.debug("initializing GenericUDFGetSplits");
+
+    try {
+      if (SessionState.get() != null && SessionState.get().getConf() != null) {
+        HiveConf conf = SessionState.get().getConf();
+        jc = new JobConf(conf);
+        db = Hive.get(conf);
+      } else {
+        jc = MapredContext.get().getJobConf();
+        db = Hive.get();
+      }
+    } catch(HiveException e) {
+      LOG.error("Failed to initialize: ",e);
+      throw new UDFArgumentException(e);
+    }
+
+    LOG.debug("Initialized conf, jc and metastore connection");
+
+    if (arguments.length != 2) {
+      throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments.");
+    } else if (!(arguments[0] instanceof StringObjectInspector)) {
+      LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+      throw new UDFArgumentTypeException(0, "\""
+	  + "string\" is expected at function GET_SPLITS, " + "but \""
+	  + arguments[0].getTypeName() + "\" is found");
+    } else if (!(arguments[1] instanceof IntObjectInspector)) {
+      LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+      throw new UDFArgumentTypeException(1, "\""
+	  + "int\" is expected at function GET_SPLITS, " + "but \""
+	  + arguments[1].getTypeName() + "\" is found");
+    }
+
+    stringOI = (StringObjectInspector) arguments[0];
+    intOI = (IntObjectInspector) arguments[1];
+
+    List<String> names = Arrays.asList("if_class","split_class","split");
+    List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
+								    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+								    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+								    PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+    ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+    ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI);
+    bos = new ByteArrayOutputStream(1024);
+    dos = new DataOutputStream(bos);
+
+    LOG.debug("done initializing GenericUDFGetSplits");
+    return listOI;
+  }
+
+  @Override
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+    retArray.clear();
+
+    String query = stringOI.getPrimitiveJavaObject(arguments[0].get());
+
+    int num = intOI.get(arguments[1].get());
+
+    Driver driver = new Driver();
+    CommandProcessorResponse cpr;
+
+    HiveConf conf = SessionState.get().getConf();
+
+    if (conf == null) {
+      throw new HiveException("Need configuration");
+    }
+
+    LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.toString()+"\"");
+    HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.toString());
+
+    cpr = driver.compileAndRespond(query);
+    if(cpr.getResponseCode() != 0) {
+      throw new HiveException("Failed to compile query: "+cpr.getException());
+    }
+
+    QueryPlan plan = driver.getPlan();
+    List<Task<?>> roots = plan.getRootTasks();
+    Schema schema = plan.getResultSchema();
+
+    if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+      throw new HiveException("Was expecting a single TezTask.");
+    }
+
+    Path data = null;
+    InputFormat inp = null;
+    String ifc = null;
+
+    TezWork tezWork = ((TezTask)roots.get(0)).getWork();
+
+    if (tezWork.getAllWork().size() != 1) {
+
+      String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
+
+      String ctas = "create temporary table "+tableName+" as "+query;
+      LOG.info("CTAS: "+ctas);
+
+      try {
+        cpr = driver.run(ctas, false);
+      } catch(CommandNeedRetryException e) {
+        throw new HiveException(e);
+      }
+
+      if(cpr.getResponseCode() != 0) {
+        throw new HiveException("Failed to create temp table: " + cpr.getException());
+      }
+
+      query = "select * from " + tableName;
+      cpr = driver.compileAndRespond(query);
+      if(cpr.getResponseCode() != 0) {
+        throw new HiveException("Failed to create temp table: "+cpr.getException());
+      }
+
+      plan = driver.getPlan();
+      roots = plan.getRootTasks();
+      schema = plan.getResultSchema();
+
+      if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+        throw new HiveException("Was expecting a single TezTask.");
+      }
+
+      tezWork = ((TezTask)roots.get(0)).getWork();
+
+      // Table table = db.getTable(tableName);
+      // if (table.isPartitioned()) {
+      //   throw new UDFArgumentException("Table " + tableName + " is partitioned.");
+      // }
+      // data = table.getDataLocation();
+      // LOG.info("looking at: "+data);
+
+      // ifc = table.getInputFormatClass().toString();
+
+      // inp = ReflectionUtils.newInstance(table.getInputFormatClass(), jc);
+    }
+
+    MapWork w = (MapWork)tezWork.getAllWork().get(0);
+    inp = new LlapInputFormat(tezWork, schema);
+    ifc = LlapInputFormat.class.toString();
+
+    try {
+      if (inp instanceof JobConfigurable) {
+        ((JobConfigurable) inp).configure(jc);
+      }
+
+      if (inp instanceof FileInputFormat) {
+        ((FileInputFormat) inp).addInputPath(jc, data);
+      }
+
+      for (InputSplit s: inp.getSplits(jc, num)) {
+        Object[] os = new Object[3];
+        os[0] = ifc;
+        os[1] = s.getClass().toString();
+        bos.reset();
+        s.write(dos);
+        byte[] frozen = bos.toByteArray();
+        os[2] = frozen;
+        retArray.add(os);
+      }
+    } catch(Exception e) {
+      throw new HiveException(e);
+    }
+
+    return retArray;
+  }
+
+  @Override
+  public String getDisplayString(String[] children) {
+    assert children.length == 2;
+    return getStandardDisplayString("get_splits", children);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
new file mode 100644
index 0000000..c49231c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import java.net.Socket;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+
+
+public class TestLlapOutputFormat {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestLlapOutputFormat.class);
+
+  private LlapOutputFormatService service;
+
+  @Before
+  public void setUp() throws IOException {
+    LOG.debug("Setting up output service");
+    service = LlapOutputFormatService.get();
+    LlapProxy.setDaemon(true);
+    LOG.debug("Output service up");
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    LOG.debug("Tearing down service");
+    service.stop();
+    LOG.debug("Tearing down complete");
+  }
+
+  @Test
+  public void testValues() throws Exception {
+    JobConf job = new JobConf();
+    job.set(LlapOutputFormat.LLAP_OF_ID_KEY, "foobar");
+    LlapOutputFormat format = new LlapOutputFormat();
+
+    HiveConf conf = new HiveConf();
+    Socket socket = new Socket("localhost",
+        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+
+    LOG.debug("Socket connected");
+
+    socket.getOutputStream().write("foobar".getBytes());
+    socket.getOutputStream().write(0);
+    socket.getOutputStream().flush();
+
+    Thread.sleep(3000);
+
+    LOG.debug("Data written");
+
+    RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null);
+    Text text = new Text();
+
+    LOG.debug("Have record writer");
+
+    for (int i = 0; i < 10; ++i) {
+      text.set(""+i);
+      writer.write(NullWritable.get(),text);
+    }
+
+    writer.close(null);
+
+    InputStream in = socket.getInputStream();
+    RecordReader reader = new LlapRecordReader(in, null, Text.class);
+
+    LOG.debug("Have record reader");
+
+    int count = 0;
+    while(reader.next(NullWritable.get(), text)) {
+      LOG.debug(text.toString());
+      count++;
+    }
+
+    Assert.assertEquals(count,10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/queries/clientpositive/udf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udf_get_splits.q b/ql/src/test/queries/clientpositive/udf_get_splits.q
new file mode 100644
index 0000000..70400e8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udf_get_splits.q
@@ -0,0 +1,6 @@
+set hive.fetch.task.conversion=more;
+
+DESCRIBE FUNCTION get_splits;
+DESCRIBE FUNCTION EXTENDED get_splits;
+
+select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t;

http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
new file mode 100644
index 0000000..c8ebe88
--- /dev/null
+++ b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
@@ -0,0 +1,73 @@
+PREHOOK: query: DESCRIBE FUNCTION get_splits
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION get_splits
+POSTHOOK: type: DESCFUNCTION
+get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+PREHOOK: type: DESCFUNCTION
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+POSTHOOK: type: DESCFUNCTION
+get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3
+PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3
+POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3
+PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e
+POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e
+PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+class org.apache.hadoop.mapred.TextInputFormat	class org.apache.hadoop.mapred.FileSplit	-1434872849	218
+class org.apache.hadoop.mapred.TextInputFormat	class org.apache.hadoop.mapred.FileSplit	2107621793	218
+class org.apache.hadoop.mapred.TextInputFormat	class org.apache.hadoop.mapred.FileSplit	-1988206222	218
+class org.apache.hadoop.mapred.TextInputFormat	class org.apache.hadoop.mapred.FileSplit	1357774915	218
+class org.apache.hadoop.mapred.TextInputFormat	class org.apache.hadoop.mapred.FileSplit	605302265	218


Mime
View raw message