hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r918793 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
Date Thu, 04 Mar 2010 00:34:13 GMT
Author: namit
Date: Thu Mar  4 00:34:13 2010
New Revision: 918793

URL: http://svn.apache.org/viewvc?rev=918793&view=rev
Log:
HIVE-1197. Add BucketizedHiveInputFormat
(Siying Dong via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketizedhiveinputformat.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out
Removed:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=918793&r1=918792&r2=918793&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Mar  4 00:34:13 2010
@@ -42,6 +42,9 @@
     HIVE-259. Add PERCENTILE aggregate function.
     (Jerome Boulon, Zheng via He Yongqiang)
 
+    HIVE-1197. Add BucketizedHiveInputFormat
+    (Siying Dong via namit)
+
   IMPROVEMENTS
     HIVE-983. Function from_unixtime takes long.
     (Ning Zhang via zshao)

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=918793&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
(added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
Thu Mar  4 00:34:13 2010
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.exec.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
+import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * BucketizedHiveInputFormat serves the similar function as hiveInputFormat but
+ * its getSplits() always group splits from one input file into one wrapper
+ * split. It is useful for the applications that requires input files to fit in
+ * one mapper.
+ */
+public class BucketizedHiveInputFormat<K extends WritableComparable, V extends Writable>
+    extends HiveInputFormat<K, V> {
+
+  public static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat");
+
+  @Override
+  public RecordReader getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    BucketizedHiveInputSplit hsplit = (BucketizedHiveInputSplit) split;
+
+    String inputFormatClassName = null;
+    Class inputFormatClass = null;
+    try {
+      inputFormatClassName = hsplit.inputFormatClassName();
+      inputFormatClass = job.getClassByName(inputFormatClassName);
+    } catch (Exception e) {
+      throw new IOException("cannot find class " + inputFormatClassName);
+    }
+
+    // clone a jobConf for setting needed columns for reading
+    JobConf cloneJobConf = new JobConf(job);
+    initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+        .toString(), hsplit.getPath().toUri().getPath());
+
+    InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
+        cloneJobConf);
+    return new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf,
+        reporter);
+  }
+
+  protected FileStatus[] listStatus(FileSystem fs, FileStatus fileStatus)
+      throws IOException {
+    ArrayList<FileStatus> result = new ArrayList<FileStatus>();
+
+    if (fileStatus.isDir()) {
+      for (FileStatus stat : fs.listStatus(fileStatus.getPath())) {
+        for (FileStatus retStat : listStatus(fs, stat)) {
+          result.add(retStat);
+        }
+      }
+    } else {
+      result.add(fileStatus);
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+
+  }
+
+  protected FileStatus[] listStatus(JobConf job, Path path) throws IOException {
+    ArrayList<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
+
+    FileSystem fs = path.getFileSystem(job);
+    FileStatus[] matches = fs.globStatus(path);
+    if (matches == null) {
+      errors.add(new IOException("Input path does not exist: " + path));
+    } else if (matches.length == 0) {
+      errors.add(new IOException("Input Pattern " + path + " matches 0 files"));
+    } else {
+      for (FileStatus globStat : matches) {
+        for (FileStatus retStat : listStatus(fs, globStat)) {
+          result.add(retStat);
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result.toArray(new FileStatus[result.size()]);
+
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    init(job);
+
+    Path[] dirs = FileInputFormat.getInputPaths(job);
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+    JobConf newjob = new JobConf(job);
+    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+    int numOrigSplits = 0;
+    // for each dir, get all files under the dir, do getSplits to each
+    // individual file,
+    // and then create a BucketizedHiveInputSplit on it
+    for (Path dir : dirs) {
+      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
+      // create a new InputFormat instance if this is the first time to see this
+      // class
+      Class inputFormatClass = part.getInputFileFormatClass();
+      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+      newjob.setInputFormat(inputFormat.getClass());
+
+      FileStatus[] listStatus = listStatus(newjob, dir);
+
+      for (FileStatus status : listStatus) {
+        LOG.info("block size: " + status.getBlockSize());
+        LOG.info("file length: " + status.getLen());
+        FileInputFormat.setInputPaths(newjob, status.getPath());
+        InputSplit[] iss = inputFormat.getSplits(newjob, 0);
+        if (iss != null && iss.length > 0) {
+          numOrigSplits += iss.length;
+          result.add(new BucketizedHiveInputSplit(iss, inputFormatClass
+              .getName()));
+        }
+      }
+    }
+    LOG.info(result.size() + " bucketized splits generated from "
+        + numOrigSplits + " original splits.");
+    return result.toArray(new BucketizedHiveInputSplit[result.size()]);
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java?rev=918793&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java
(added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java
Thu Mar  4 00:34:13 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.exec.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
+import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * HiveInputSplit encapsulates an InputSplit with its corresponding
+ * inputFormatClass. The reason that it derives from FileSplit is to make sure
+ * "map.input.file" in MapTask.
+ */
+public class BucketizedHiveInputSplit extends HiveInputSplit {
+
+  protected InputSplit[] inputSplits;
+  protected String inputFormatClassName;
+
+  public String getInputFormatClassName() {
+    return inputFormatClassName;
+  }
+
+  public void setInputFormatClassName(String inputFormatClassName) {
+    this.inputFormatClassName = inputFormatClassName;
+  }
+
+  public BucketizedHiveInputSplit() {
+    // This is the only public constructor of FileSplit
+    super();
+  }
+
+  public BucketizedHiveInputSplit(InputSplit[] inputSplits,
+      String inputFormatClassName) {
+    // This is the only public constructor of FileSplit
+    super();
+
+    assert (inputSplits != null && inputSplits.length > 0);
+    this.inputSplits = inputSplits;
+    this.inputFormatClassName = inputFormatClassName;
+  }
+
+  public int getNumSplits() {
+    return inputSplits.length;
+  }
+
+  public InputSplit getSplit(int idx) {
+    assert (idx >= 0 && idx < inputSplits.length);
+    return inputSplits[idx];
+  }
+
+  public String inputFormatClassName() {
+    return inputFormatClassName;
+  }
+
+  @Override
+  public Path getPath() {
+    if (inputSplits != null && inputSplits.length > 0
+        && inputSplits[0] instanceof FileSplit) {
+      return ((FileSplit) inputSplits[0]).getPath();
+    }
+    return new Path("");
+  }
+
+  /** The position of the first byte in the file to process. */
+  @Override
+  public long getStart() {
+    if (inputSplits != null && inputSplits.length > 0
+        && inputSplits[0] instanceof FileSplit) {
+      return ((FileSplit) inputSplits[0]).getStart();
+    }
+    return 0;
+  }
+
+  @Override
+  public String toString() {
+    if (inputSplits != null && inputSplits.length > 0) {
+      return inputFormatClassName + ":" + inputSplits[0].toString();
+    }
+    return inputFormatClassName + ":null";
+  }
+
+  @Override
+  public long getLength() {
+    long r = 0;
+    if (inputSplits != null) {
+      try {
+        for (InputSplit inputSplit : inputSplits) {
+          r += inputSplit.getLength();
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return r;
+  }
+
+  public long getLength(int idx) {
+    if (inputSplits != null) {
+      try {
+        return inputSplits[idx].getLength();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return -1;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    assert (inputSplits != null && inputSplits.length > 0);
+    return inputSplits[0].getLocations();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    String inputSplitClassName = in.readUTF();
+
+    int numSplits = in.readInt();
+    inputSplits = new InputSplit[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      try {
+        inputSplits[i] = (InputSplit) ReflectionUtils.newInstance(conf
+            .getClassByName(inputSplitClassName), conf);
+      } catch (Exception e) {
+        throw new IOException(
+            "Cannot create an instance of InputSplit class = "
+                + inputSplitClassName + ":" + e.getMessage());
+      }
+      inputSplits[i].readFields(in);
+    }
+    inputFormatClassName = in.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    assert (inputSplits != null && inputSplits.length > 0);
+    out.writeUTF(inputSplits[0].getClass().getName());
+    out.writeInt(inputSplits.length);
+    for (InputSplit inputSplit : inputSplits) {
+      inputSplit.write(out);
+    }
+    out.writeUTF(inputFormatClassName);
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java?rev=918793&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
(added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
Thu Mar  4 00:34:13 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.ExecMapper;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+/**
+ * BucketizedHiveRecordReader is a wrapper on a list of RecordReader. It behaves
+ * similar as HiveRecordReader while it wraps a list of RecordReader from one
+ * file.
+ */
+public class BucketizedHiveRecordReader<K extends WritableComparable, V extends Writable>
+    implements RecordReader<K, V> {
+  protected final BucketizedHiveInputSplit split;
+  protected final InputFormat inputFormat;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected RecordReader curReader;
+  protected long progress;
+  protected int idx;
+
+  public BucketizedHiveRecordReader(InputFormat inputFormat,
+      BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf,
+      Reporter reporter) throws IOException {
+    this.split = bucketizedSplit;
+    this.inputFormat = inputFormat;
+    this.jobConf = jobConf;
+    this.reporter = reporter;
+    initNextRecordReader();
+  }
+
+  public void close() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+    idx = 0;
+  }
+
+  public K createKey() {
+    return (K) curReader.createKey();
+  }
+
+  public V createValue() {
+    return (V) curReader.createValue();
+  }
+
+  public long getPos() throws IOException {
+    if (curReader != null) {
+      return curReader.getPos();
+    } else {
+      return 0;
+    }
+  }
+
+  public float getProgress() throws IOException {
+    // The calculation is strongly dependent on the assumption that all splits
+    // came from the same file
+    return Math.min(1.0f, ((curReader == null) ? progress : curReader.getPos())
+        / (float) (split.getLength()));
+  }
+
+  public boolean next(K key, V value) throws IOException {
+    while ((curReader == null) || !curReader.next(key, value)) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the record reader for the next chunk in this
+   * BucketizedHiveRecordReader.
+   */
+  protected boolean initNextRecordReader() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+      if (idx > 0) {
+        progress += split.getLength(idx - 1); // done processing so far
+      }
+    }
+
+    // if all chunks have been processed, nothing more to do.
+    if (idx == split.getNumSplits()) {
+      return false;
+    }
+
+    // get a record reader for the idx-th chunk
+    try {
+      curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
+          reporter);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    idx++;
+    return true;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=918793&r1=918792&r2=918793&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Mar
 4 00:34:13 2010
@@ -177,7 +177,7 @@
   /**
    * A cache of InputFormat instances.
    */
-  private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats;
+  protected static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats;
 
   static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
       Class inputFormatClass, JobConf job) throws IOException {
@@ -223,7 +223,7 @@
         cloneJobConf, reporter));
   }
 
-  private Map<String, PartitionDesc> pathToPartitionInfo;
+  protected Map<String, PartitionDesc> pathToPartitionInfo;
   MapredWork mrwork = null;
 
   protected void init(JobConf job) {

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketizedhiveinputformat.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketizedhiveinputformat.q?rev=918793&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketizedhiveinputformat.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketizedhiveinputformat.q Thu Mar
 4 00:34:13 2010
@@ -0,0 +1,40 @@
+set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+set mapred.min.split.size = 64;
+set dfs.block.size=64; 
+
+
+CREATE TABLE T1(name STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(name STRING) STORED AS SEQUENCEFILE;
+
+EXPLAIN INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000;
+
+
+INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000;
+
+EXPLAIN SELECT COUNT(1) FROM T2;
+SELECT COUNT(1) FROM T2;
+
+
+CREATE TABLE T3(name STRING) STORED AS TEXTFILE;
+LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T3;
+LOAD DATA LOCAL INPATH '../data/files/kv2.txt' INTO TABLE T3;
+
+EXPLAIN SELECT COUNT(1) FROM T3;
+SELECT COUNT(1) FROM T3;
+
+DROP TABLE T1;
+DROP TABLE T2;
+DROP TABLE T3;

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out?rev=918793&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out Thu
Mar  4 00:34:13 2010
@@ -0,0 +1,363 @@
+PREHOOK: query: CREATE TABLE T1(name STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T1(name STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T1
+PREHOOK: type: LOAD
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t1
+PREHOOK: query: CREATE TABLE T2(name STRING) STORED AS SEQUENCEFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T2(name STRING) STORED AS SEQUENCEFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T2
+PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_JOIN (TOK_SUBQUERY
(TOK_QUERY (TOK_FROM (TOK_TABREF T1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL name)) (TOK_SELEXPR 'MMM' n)))) tmp1) (TOK_SUBQUERY
(TOK_QUERY (TOK_FROM (TOK_TABREF T1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR 'MMM' n)))) tmp2)) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF
T1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR 'MMM' n))))
tmp3) (AND (= (. (TOK_TABLE_OR_COL tmp1) n) (. (TOK_TABLE_OR_COL tmp2) n)) (= (. (TOK_TABLE_OR_COL
tmp1) n) (. (TOK_TABLE_OR_COL tmp3) n))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp1) name) name)))) ttt)) (TOK_INSERT (TOK_DESTINATION
(TOK_TAB T2)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 5000000)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-3 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-3
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        ttt:tmp1:t1 
+          TableScan
+            alias: t1
+            Select Operator
+              expressions:
+                    expr: name
+                    type: string
+                    expr: 'MMM'
+                    type: string
+              outputColumnNames: _col0, _col1
+              Reduce Output Operator
+                sort order: 
+                tag: 0
+                value expressions:
+                      expr: _col0
+                      type: string
+                      expr: _col1
+                      type: string
+        ttt:tmp2:t1 
+          TableScan
+            alias: t1
+            Select Operator
+              expressions:
+                    expr: 'MMM'
+                    type: string
+              outputColumnNames: _col0
+              Reduce Output Operator
+                sort order: 
+                tag: 1
+                value expressions:
+                      expr: _col0
+                      type: string
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          condition expressions:
+            0 {VALUE._col0} {VALUE._col1}
+            1 {VALUE._col0}
+          handleSkewJoin: false
+          outputColumnNames: _col0, _col1, _col2
+          Filter Operator
+            predicate:
+                expr: (_col1 = _col2)
+                type: boolean
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+        $INTNAME 
+            Reduce Output Operator
+              key expressions:
+                    expr: _col1
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: _col1
+                    type: string
+              tag: 0
+              value expressions:
+                    expr: _col0
+                    type: string
+        ttt:tmp3:t1 
+          TableScan
+            alias: t1
+            Select Operator
+              expressions:
+                    expr: 'MMM'
+                    type: string
+              outputColumnNames: _col0
+              Reduce Output Operator
+                key expressions:
+                      expr: _col0
+                      type: string
+                sort order: +
+                Map-reduce partition columns:
+                      expr: _col0
+                      type: string
+                tag: 1
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          condition expressions:
+            0 {VALUE._col1}
+            1 
+          handleSkewJoin: false
+          outputColumnNames: _col1
+          Select Operator
+            expressions:
+                  expr: _col1
+                  type: string
+            outputColumnNames: _col0
+            Select Operator
+              expressions:
+                    expr: _col0
+                    type: string
+              outputColumnNames: _col0
+              Limit
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+  Stage: Stage-3
+    Map Reduce
+      Alias -> Map Operator Tree:
+        file:/data/users/sdong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2010-03-03_13-34-44_281_7699816024642764738/10003

+            Reduce Output Operator
+              sort order: 
+              tag: -1
+              value expressions:
+                    expr: _col0
+                    type: string
+      Reduce Operator Tree:
+        Extract
+          Limit
+            File Output Operator
+              compressed: false
+              GlobalTableId: 1
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: t2
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: t2
+
+
+PREHOOK: query: INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t2
+POSTHOOK: query: INSERT OVERWRITE TABLE T2 SELECT * FROM (
+SELECT tmp1.name as name FROM (
+  SELECT name, 'MMM' AS n FROM T1) tmp1 
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp2
+  JOIN (SELECT 'MMM' AS n FROM T1) tmp3
+  ON tmp1.n = tmp2.n AND tmp1.n = tmp3.n) ttt LIMIT 5000000
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t2
+PREHOOK: query: EXPLAIN SELECT COUNT(1) FROM T2
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT COUNT(1) FROM T2
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF T2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION COUNT 1)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        t2 
+          TableScan
+            alias: t2
+            Select Operator
+              Group By Operator
+                aggregations:
+                      expr: count(1)
+                bucketGroup: false
+                mode: hash
+                outputColumnNames: _col0
+                Reduce Output Operator
+                  sort order: 
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: bigint
+            outputColumnNames: _col0
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: SELECT COUNT(1) FROM T2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t2
+PREHOOK: Output: file:/data/users/sdong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2010-03-03_13-36-25_460_1971004153631800172/10000
+POSTHOOK: query: SELECT COUNT(1) FROM T2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t2
+POSTHOOK: Output: file:/data/users/sdong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2010-03-03_13-36-25_460_1971004153631800172/10000
+5000000
+PREHOOK: query: CREATE TABLE T3(name STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T3(name STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T3
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T3
+PREHOOK: type: LOAD
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T3
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t3
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv2.txt' INTO TABLE T3
+PREHOOK: type: LOAD
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv2.txt' INTO TABLE T3
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t3
+PREHOOK: query: EXPLAIN SELECT COUNT(1) FROM T3
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT COUNT(1) FROM T3
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF T3)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION COUNT 1)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        t3 
+          TableScan
+            alias: t3
+            Select Operator
+              Group By Operator
+                aggregations:
+                      expr: count(1)
+                bucketGroup: false
+                mode: hash
+                outputColumnNames: _col0
+                Reduce Output Operator
+                  sort order: 
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: bigint
+            outputColumnNames: _col0
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: SELECT COUNT(1) FROM T3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t3
+PREHOOK: Output: file:/data/users/sdong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2010-03-03_13-36-38_029_7372303432096245458/10000
+POSTHOOK: query: SELECT COUNT(1) FROM T3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t3
+POSTHOOK: Output: file:/data/users/sdong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2010-03-03_13-36-38_029_7372303432096245458/10000
+1000
+PREHOOK: query: DROP TABLE T1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE T1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@t1
+PREHOOK: query: DROP TABLE T2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE T2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@t2
+PREHOOK: query: DROP TABLE T3
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE T3
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@t3



Mime
View raw message