hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1508202 [8/48] - in /hive/branches/tez: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/metrics/ common/src/java/org/apache/hadoop/hive/conf/ common/src/te...
Date Mon, 29 Jul 2013 21:08:19 GMT
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Jul 29 21:08:03 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -40,12 +42,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -231,6 +234,14 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Check the state of suppress flag in output stream
+     * @return value of suppress flag
+     */
+    public boolean isSuppressed() {
+      return outStream.isSuppressed();
+    }
+
+    /**
      * Write the saved compressed buffers to the OutputStream.
      * @param out the stream to write to
      * @throws IOException
@@ -291,9 +302,9 @@ class WriterImpl implements Writer, Memo
      * @return The output outStream that the section needs to be written to.
      * @throws IOException
      */
-    public PositionedOutputStream createStream(int column,
-                                               OrcProto.Stream.Kind kind
-                                              ) throws IOException {
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
       StreamName name = new StreamName(column, kind);
       BufferedStream result = streams.get(name);
       if (result == null) {
@@ -325,6 +336,14 @@ class WriterImpl implements Writer, Memo
     public boolean buildIndex() {
       return buildIndex;
     }
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    public boolean isCompressed() {
+      return codec != null;
+    }
   }
 
   /**
@@ -337,6 +356,7 @@ class WriterImpl implements Writer, Memo
     protected final int id;
     protected final ObjectInspector inspector;
     private final BitFieldWriter isPresent;
+    private final boolean isCompressed;
     protected final ColumnStatisticsImpl indexStatistics;
     private final ColumnStatisticsImpl fileStatistics;
     protected TreeWriter[] childrenWriters;
@@ -344,6 +364,8 @@ class WriterImpl implements Writer, Memo
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
     private final PositionedOutputStream rowIndexStream;
+    private boolean foundNulls;
+    private OutStream isPresentOutStream;
 
     /**
      * Create a tree writer.
@@ -356,14 +378,17 @@ class WriterImpl implements Writer, Memo
     TreeWriter(int columnId, ObjectInspector inspector,
                StreamFactory streamFactory,
                boolean nullable) throws IOException {
+      this.isCompressed = streamFactory.isCompressed();
       this.id = columnId;
       this.inspector = inspector;
       if (nullable) {
-        isPresent = new BitFieldWriter(streamFactory.createStream(id,
-            OrcProto.Stream.Kind.PRESENT), 1);
+        isPresentOutStream = streamFactory.createStream(id,
+            OrcProto.Stream.Kind.PRESENT);
+        isPresent = new BitFieldWriter(isPresentOutStream, 1);
       } else {
         isPresent = null;
       }
+      this.foundNulls = false;
       indexStatistics = ColumnStatisticsImpl.create(inspector);
       fileStatistics = ColumnStatisticsImpl.create(inspector);
       childrenWriters = new TreeWriter[0];
@@ -401,6 +426,20 @@ class WriterImpl implements Writer, Memo
       }
       if (isPresent != null) {
         isPresent.write(obj == null ? 0 : 1);
+        if(obj == null) {
+          foundNulls = true;
+        }
+      }
+    }
+
+    private void removeIsPresentPositions() {
+      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+        RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+        List<Long> positions = entry.getPositionsList();
+        // bit streams use 3 positions if uncompressed, 4 if compressed
+        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+        entry.clearPositions();
+        entry.addAllPositions(positions);
       }
     }
 
@@ -418,7 +457,21 @@ class WriterImpl implements Writer, Memo
                      int requiredIndexEntries) throws IOException {
       if (isPresent != null) {
         isPresent.flush();
+
+        // if no nulls are found in a stream, then suppress the stream
+        if(!foundNulls) {
+          isPresentOutStream.suppress();
+          // since isPresent bitstream is suppressed, update the index to
+          // remove the positions of the isPresent stream
+          if (rowIndexStream != null) {
+            removeIsPresentPositions();
+          }
+        }
       }
+
+      // reset the flag for next stripe
+      foundNulls = false;
+
       builder.addColumns(getEncoding());
       if (rowIndexStream != null) {
         if (rowIndex.getEntryCount() != requiredIndexEntries) {
@@ -810,6 +863,7 @@ class WriterImpl implements Writer, Memo
      * and augments them with the final information as the stripe is written.
      * @throws IOException
      */
+    @Override
     void createRowIndexEntry() throws IOException {
       getFileStatistics().merge(indexStatistics);
       OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
@@ -936,6 +990,46 @@ class WriterImpl implements Writer, Memo
     }
   }
 
+  private static class DateTreeWriter extends TreeWriter {
+    private final RunLengthIntegerWriter writer;
+
+    DateTreeWriter(int columnId,
+                   ObjectInspector inspector,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, inspector, writer, nullable);
+      PositionedOutputStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.writer = new RunLengthIntegerWriter(out, true);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void write(Object obj) throws IOException {
+      super.write(obj);
+      if (obj != null) {
+        // Using the Writable here as it's used directly for writing as well as for stats.
+        DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
+        indexStatistics.updateDate(val);
+        writer.write(val.getDays());
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
   private static class DecimalTreeWriter extends TreeWriter {
     private final PositionedOutputStream valueStream;
     private final RunLengthIntegerWriter scaleStream;
@@ -1210,6 +1304,9 @@ class WriterImpl implements Writer, Memo
           case TIMESTAMP:
             return new TimestampTreeWriter(streamFactory.getNextColumnId(),
                 inspector, streamFactory, nullable);
+          case DATE:
+            return new DateTreeWriter(streamFactory.getNextColumnId(),
+                inspector, streamFactory, nullable);
           case DECIMAL:
             return new DecimalTreeWriter(streamFactory.getNextColumnId(),
                 inspector, streamFactory,  nullable);
@@ -1272,6 +1369,9 @@ class WriterImpl implements Writer, Memo
           case TIMESTAMP:
             type.setKind(OrcProto.Type.Kind.TIMESTAMP);
             break;
+          case DATE:
+            type.setKind(OrcProto.Type.Kind.DATE);
+            break;
           case DECIMAL:
             type.setKind(OrcProto.Type.Kind.DECIMAL);
             break;
@@ -1349,19 +1449,21 @@ class WriterImpl implements Writer, Memo
       long indexEnd = start;
       for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
         BufferedStream stream = pair.getValue();
-        stream.flush();
-        stream.spillTo(rawWriter);
-        stream.clear();
-        long end = rawWriter.getPos();
-        StreamName name = pair.getKey();
-        builder.addStreams(OrcProto.Stream.newBuilder()
-            .setColumn(name.getColumn())
-            .setKind(name.getKind())
-            .setLength(end-section));
-        section = end;
-        if (StreamName.Area.INDEX == name.getArea()) {
-          indexEnd = end;
+        if (!stream.isSuppressed()) {
+          stream.flush();
+          stream.spillTo(rawWriter);
+          long end = rawWriter.getPos();
+          StreamName name = pair.getKey();
+          builder.addStreams(OrcProto.Stream.newBuilder()
+              .setColumn(name.getColumn())
+              .setKind(name.getKind())
+              .setLength(end-section));
+          section = end;
+          if (StreamName.Area.INDEX == name.getArea()) {
+            indexEnd = end;
+          }
         }
+        stream.clear();
       }
       builder.build().writeTo(protobufWriter);
       protobufWriter.flush();
@@ -1430,7 +1532,10 @@ class WriterImpl implements Writer, Memo
     OrcProto.PostScript.Builder builder =
       OrcProto.PostScript.newBuilder()
         .setCompression(writeCompressionKind(compress))
-        .setFooterLength(footerLength);
+        .setFooterLength(footerLength)
+        .setMagic(OrcFile.MAGIC)
+        .addVersion(OrcFile.MAJOR_VERSION)
+        .addVersion(OrcFile.MINOR_VERSION);
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Jul 29 21:08:03 2013
@@ -35,11 +35,11 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHook;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Throttle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Mon Jul 29 21:08:03 2013
@@ -33,14 +33,14 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.Mapper;
 
 @Explain(displayName = "Block level merge")
-public class MergeWork extends MapredWork implements Serializable {
+public class MergeWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -70,9 +70,6 @@ public class MergeWork extends MapredWor
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Mon Jul 29 21:08:03 2013
@@ -36,14 +36,15 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHook;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Throttle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -181,7 +182,9 @@ public class PartialScanTask extends Tas
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      MapredWork mrWork = new MapredWork();
+      mrWork.setMapWork(work);
+      Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java Mon Jul 29 21:08:03 2013
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
 import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.Mapper;
 
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapred.Mapper;
  *
  */
 @Explain(displayName = "Partial Scan Statistics")
-public class PartialScanWork extends MapredWork implements Serializable {
+public class PartialScanWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -52,9 +52,6 @@ public class PartialScanWork extends Map
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java Mon Jul 29 21:08:03 2013
@@ -76,7 +76,7 @@ public class ColumnTruncateMapper extend
   @Override
   public void configure(JobConf job) {
     jc = job;
-    work = (ColumnTruncateWork) Utilities.getMapRedWork(job);
+    work = (ColumnTruncateWork) Utilities.getMapWork(job);
 
     String specPath = work.getOutputDir();
     Path tmpPath = Utilities.toTempPath(specPath);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java Mon Jul 29 21:08:03 2013
@@ -28,13 +28,14 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
-import org.apache.hadoop.hive.ql.exec.HadoopJobExecHook;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Throttle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -165,7 +166,9 @@ public class ColumnTruncateTask extends 
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      MapredWork mrWork = new MapredWork();
+      mrWork.setMapWork(work);
+      Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java Mon Jul 29 21:08:03 2013
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.io.rcfi
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.Mapper;
 
 @Explain(displayName = "Column Truncate")
-public class ColumnTruncateWork extends MapredWork implements Serializable {
+public class ColumnTruncateWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -64,9 +64,6 @@ public class ColumnTruncateWork extends 
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     this.getPathToPartitionInfo().put(inputDir, partDesc);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Mon Jul 29 21:08:03 2013
@@ -75,6 +75,13 @@ public class DefaultGraphWalker implemen
    * @throws SemanticException
    */
   public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
+    dispatchAndReturn(nd, ndStack);
+  }
+
+  /**
+   * Returns dispatch result
+   */
+  public <T> T dispatchAndReturn(Node nd, Stack<Node> ndStack) throws SemanticException {
     Object[] nodeOutputs = null;
     if (nd.getChildren() != null) {
       nodeOutputs = new Object[nd.getChildren().size()];
@@ -86,6 +93,7 @@ public class DefaultGraphWalker implemen
 
     Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
     retMap.put(nd, retVal);
+    return (T) retVal;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java Mon Jul 29 21:08:03 2013
@@ -60,6 +60,7 @@ public class DefaultRuleDispatcher imple
    *          the operators encountered so far
    * @throws SemanticException
    */
+  @Override
   public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs)
       throws SemanticException {
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java Mon Jul 29 21:08:03 2013
@@ -58,9 +58,10 @@ public class RuleRegExp implements Rule 
    * @return cost of the function
    * @throws SemanticException
    */
+  @Override
   public int cost(Stack<Node> stack) throws SemanticException {
     int numElems = (stack != null ? stack.size() : 0);
-    String name = new String();
+    String name = "";
     for (int pos = numElems - 1; pos >= 0; pos--) {
       name = stack.get(pos).getName() + "%" + name;
       Matcher m = pattern.matcher(name);
@@ -68,13 +69,13 @@ public class RuleRegExp implements Rule 
         return m.group().length();
       }
     }
-
     return -1;
   }
 
   /**
    * @return the name of the Node
    **/
+  @Override
   public String getName() {
     return ruleName;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java Mon Jul 29 21:08:03 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.lockmgr;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -34,17 +36,29 @@ public class HiveLockObject {
     private String queryStr;
     private String clientIp;
 
+    /**
+     * Constructor
+     *
+     * Note: The parameters are used to uniquely identify a HiveLockObject. 
+     * The parameters will be stripped off any ':' characters in order not 
+     * to interfere with the way the data is serialized (':' delimited string).
+     */
     public HiveLockObjectData(String queryId,
         String lockTime,
         String lockMode,
         String queryStr) {
-      this.queryId = queryId;
-      this.lockTime = lockTime;
-      this.lockMode = lockMode;
-      this.queryStr = queryStr.trim();
+      this.queryId = removeDelimiter(queryId);
+      this.lockTime = removeDelimiter(lockTime);
+      this.lockMode = removeDelimiter(lockMode);
+      this.queryStr = removeDelimiter(queryStr.trim());
     }
 
-
+    /**
+     * Constructor
+     * 
+     * @param data String of the form "queryId:lockTime:lockMode:queryStr". 
+     * No ':' characters are allowed in any of the components.
+     */
     public HiveLockObjectData(String data) {
       if (data == null) {
         return;
@@ -94,16 +108,16 @@ public class HiveLockObject {
       }
 
       HiveLockObjectData target = (HiveLockObjectData) o;
-      boolean ret = (queryId == null) ? target.getQueryId() == null :
-          queryId.equals(target.getQueryId());
-      ret = ret && (lockTime == null) ? target.getLockTime() == null :
-          queryId.equals(target.getLockTime());
-      ret = ret && (lockMode == null) ? target.getLockMode() == null :
-          queryId.equals(target.getLockMode());
-      ret = ret && (queryStr == null) ? target.getQueryStr() == null :
-          queryStr.equals(target.getQueryStr());
-      ret = ret && (clientIp == null) ? target.getClientIp() == null :
-          clientIp.equals(target.getClientIp());
+      boolean ret = (queryId == null ? target.queryId == null :
+          target.queryId != null && queryId.equals(target.queryId));
+      ret = ret && (lockTime == null ? target.lockTime == null :
+          target.lockTime != null && lockTime.equals(target.lockTime));
+      ret = ret && (lockMode == null ? target.lockMode == null :
+          target.lockMode != null && lockMode.equals(target.lockMode));
+      ret = ret && (queryStr == null ? target.queryStr == null :
+          target.queryStr != null && queryStr.equals(target.queryStr));
+      ret = ret && (clientIp == null ? target.clientIp == null :
+          target.clientIp != null && clientIp.equals(target.clientIp));
 
       return ret;
     }
@@ -145,20 +159,17 @@ public class HiveLockObject {
   }
 
   public String getName() {
-    if (this.pathNames == null) {
+    if (pathNames == null) {
       return null;
     }
-    String ret = "";
-    boolean first = true;
+    StringBuilder builder = new StringBuilder();
     for (int i = 0; i < pathNames.length; i++) {
-      if (!first) {
-        ret = ret + "/";
-      } else {
-        first = false;
+      if (i > 0) {
+        builder.append('/');
       }
-      ret = ret + pathNames[i];
+      builder.append(pathNames[i]);
     }
-    return ret;
+    return builder.toString();
   }
 
   public String getDisplayName() {
@@ -200,7 +211,15 @@ public class HiveLockObject {
     }
 
     HiveLockObject tgt = (HiveLockObject) o;
-    return getName().equals(tgt.getName()) &&
-        data == null ? tgt.getData() == null : data.equals(tgt.getData());
+    return Arrays.equals(pathNames, tgt.pathNames) &&
+        data == null ? tgt.getData() == null :
+        tgt.getData() != null && data.equals(tgt.getData());
+  }
+
+  private static String removeDelimiter(String in) {
+    if (in == null) {
+      return null;
+    }
+    return in.replaceAll(":","");
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Mon Jul 29 21:08:03 2013
@@ -55,6 +55,8 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ZooKeeperHiveLockManager implements HiveLockManager {
   HiveLockManagerCtx ctx;
   public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
@@ -439,20 +441,25 @@ public class ZooKeeperHiveLockManager im
   }
 
   /* Remove the lock specified */
-  private static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
+  @VisibleForTesting
+  static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
                              HiveLock hiveLock, String parent) throws LockException {
     ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
     try {
+      // can throw KeeperException.NoNodeException, which might mean something is wrong
       zkpClient.delete(zLock.getPath(), -1);
 
       // Delete the parent node if all the children have been deleted
       HiveLockObject obj = zLock.getHiveLockObject();
       String name  = getLastObjectName(parent, obj);
 
-      List<String> children = zkpClient.getChildren(name, false);
-      if ((children == null) || (children.isEmpty()))
-      {
-        zkpClient.delete(name, -1);
+      try {
+        List<String> children = zkpClient.getChildren(name, false);
+        if (children == null || children.isEmpty()) {
+          zkpClient.delete(name, -1);
+        }
+      } catch (KeeperException.NoNodeException e) {
+        LOG.debug("Node " + name + " previously deleted when attempting to delete.");
       }
     } catch (Exception e) {
       LOG.error("Failed to release ZooKeeper lock: ", e);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Jul 29 21:08:03 2013
@@ -729,8 +729,9 @@ public class Hive {
       List<Order> sortCols = new ArrayList<Order>();
       storageDescriptor.setBucketCols(null);
       int k = 0;
-      for (int i = 0; i < storageDescriptor.getCols().size(); i++) {
-        FieldSchema col = storageDescriptor.getCols().get(i);
+      Table metaBaseTbl = new Table(baseTbl);
+      for (int i = 0; i < metaBaseTbl.getCols().size(); i++) {
+        FieldSchema col = metaBaseTbl.getCols().get(i);
         if (indexedCols.contains(col.getName())) {
           indexTblCols.add(col);
           sortCols.add(new Order(col.getName(), 1));
@@ -950,7 +951,7 @@ public class Hive {
     } catch (NoSuchObjectException e) {
       if (throwException) {
         LOG.error(StringUtils.stringifyException(e));
-        throw new InvalidTableException("Table " + tableName + " not found ", tableName);
+        throw new InvalidTableException(tableName);
       }
       return null;
     } catch (Exception e) {
@@ -2113,12 +2114,75 @@ private void constructOneLBLocationMap(F
     return false;
   }
 
+  //it is assumed that parent directory of the destf should already exist when this
+  //method is called. when the replace value is true, this method works a little different
+  //from mv command if the destf is a directory, it replaces the destf instead of moving under
+  //the destf. in this case, the replaced destf still preserves the original destf's permission
+  static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, FileSystem fs,
+      boolean replace) throws HiveException {
+    boolean success = false;
+    boolean inheritPerms = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+    String group = null;
+    String permission = null;
+
+    try {
+      if (inheritPerms || replace) {
+        try{
+          FileStatus deststatus = fs.getFileStatus(destf);
+          if (inheritPerms) {
+            group = deststatus.getGroup();
+            permission= Integer.toString(deststatus.getPermission().toShort(), 8);
+          }
+          //if destf is an existing directory:
+          //if replace is true, delete followed by rename(mv) is equivalent to replace
+          //if replace is false, rename (mv) actually move the src under dest dir
+          //if destf is an existing file, rename is actually a replace, and do not need
+          // to delete the file first
+          if (replace && deststatus.isDir()) {
+            fs.delete(destf, true);
+          }
+        } catch (FileNotFoundException ignore) {
+          //if dest dir does not exist, any re
+          if (inheritPerms) {
+            FileStatus deststatus = fs.getFileStatus(destf.getParent());
+            group = deststatus.getGroup();
+            permission= Integer.toString(deststatus.getPermission().toShort(), 8);
+          }
+        }
+      }
+      success = fs.rename(srcf, destf);
+      LOG.debug((replace ? "Replacing src:" : "Renaming src:") + srcf.toString()
+          + ";dest: " + destf.toString()  + ";Status:" + success);
+    } catch (IOException ioe) {
+      throw new HiveException("Unable to move source" + srcf + " to destination " + destf, ioe);
+    }
+
+    if (success && inheritPerms) {
+      //use FsShell to change group and permissions recursively
+      try {
+        FsShell fshell = new FsShell();
+        fshell.setConf(conf);
+        fshell.run(new String[]{"-chgrp", "-R", group, destf.toString()});
+        fshell.run(new String[]{"-chmod", "-R", permission, destf.toString()});
+      } catch (Exception e) {
+        throw new HiveException("Unable to set permissions of " + destf, e);
+      }
+    }
+    return success;
+  }
+
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs)
       throws HiveException {
+    boolean inheritPerms = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
       // create the destination if it does not exist
       if (!fs.exists(destf)) {
         fs.mkdirs(destf);
+        if (inheritPerms) {
+          fs.setPermission(destf, fs.getFileStatus(destf.getParent()).getPermission());
+        }
       }
     } catch (IOException e) {
       throw new HiveException(
@@ -2145,7 +2209,7 @@ private void constructOneLBLocationMap(F
     try {
       for (List<Path[]> sdpairs : result) {
         for (Path[] sdpair : sdpairs) {
-          if (!fs.rename(sdpair[0], sdpair[1])) {
+          if (!renameFile(conf, sdpair[0], sdpair[1], fs, false)) {
             throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]);
           }
         }
@@ -2174,6 +2238,8 @@ private void constructOneLBLocationMap(F
       throws HiveException {
     try {
       FileSystem fs = srcf.getFileSystem(conf);
+      boolean inheritPerms = HiveConf.getBoolVar(conf,
+          HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
 
       // check if srcf contains nested sub-directories
       FileStatus[] srcs;
@@ -2188,8 +2254,10 @@ private void constructOneLBLocationMap(F
       }
       List<List<Path[]>> result = checkPaths(conf, fs, srcs, destf, true);
 
-      // point of no return -- delete oldPath
-      if (oldPath != null) {
+      // point of no return -- delete oldPath only if it is not same as destf,
+      // otherwise, the oldPath/destf will be cleaned later just before move
+      if (oldPath != null && (!destf.getFileSystem(conf).equals(oldPath.getFileSystem(conf))
+          || !destf.equals(oldPath))) {
         try {
           FileSystem fs2 = oldPath.getFileSystem(conf);
           if (fs2.exists(oldPath)) {
@@ -2207,27 +2275,30 @@ private void constructOneLBLocationMap(F
       // rename src directory to destf
       if (srcs.length == 1 && srcs[0].isDir()) {
         // rename can fail if the parent doesn't exist
-        if (!fs.exists(destf.getParent())) {
-          fs.mkdirs(destf.getParent());
-        }
-        if (fs.exists(destf)) {
-          fs.delete(destf, true);
+        Path destfp = destf.getParent();
+        if (!fs.exists(destfp)) {
+          boolean success = fs.mkdirs(destfp);
+          if (inheritPerms && success) {
+            fs.setPermission(destfp, fs.getFileStatus(destfp.getParent()).getPermission());
+          }
         }
 
-        boolean b = fs.rename(srcs[0].getPath(), destf);
+        boolean b = renameFile(conf, srcs[0].getPath(), destf, fs, true);
         if (!b) {
           throw new HiveException("Unable to move results from " + srcs[0].getPath()
               + " to destination directory: " + destf);
         }
-        LOG.debug("Renaming:" + srcf.toString() + " to " + destf.toString()  + ",Status:" + b);
       } else { // srcf is a file or pattern containing wildcards
         if (!fs.exists(destf)) {
-          fs.mkdirs(destf);
+          boolean success = fs.mkdirs(destf);
+          if (inheritPerms && success) {
+            fs.setPermission(destf, fs.getFileStatus(destf.getParent()).getPermission());
+          }
         }
         // srcs must be a list of files -- ensured by LoadSemanticAnalyzer
         for (List<Path[]> sdpairs : result) {
           for (Path[] sdpair : sdpairs) {
-            if (!fs.rename(sdpair[0], sdpair[1])) {
+            if (!renameFile(conf, sdpair[0], sdpair[1], fs, true)) {
               throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]);
             }
           }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java Mon Jul 29 21:08:03 2013
@@ -18,11 +18,17 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
+import org.apache.hadoop.hive.ql.ErrorMsg;
+
 /**
  * Generic exception class for Hive.
  */
 
 public class HiveException extends Exception {
+  /**
+   * Standard predefined message with error code and possibly SQL State, etc.
+   */
+  private ErrorMsg canonicalErrorMsg = ErrorMsg.GENERIC_ERROR;
   public HiveException() {
     super();
   }
@@ -38,4 +44,26 @@ public class HiveException extends Excep
   public HiveException(String message, Throwable cause) {
     super(message, cause);
   }
+
+  public HiveException(ErrorMsg message, String... msgArgs) {
+    this(null, message, msgArgs);
+  }
+
+  /**
+   * This is the recommended constructor to use since it helps use
+   * canonical messages throughout.  
+   * @param errorMsg Canonical error message
+   * @param msgArgs message arguments if message is parametrized; must be {@code null} is message takes no arguments
+   */
+  public HiveException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) {
+    super(errorMsg.format(msgArgs), cause);
+    canonicalErrorMsg = errorMsg;
+
+  }
+  /**
+   * @return {@link ErrorMsg#GENERIC_ERROR} by default
+   */
+  public ErrorMsg getCanonicalErrorMsg() {
+    return canonicalErrorMsg;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/InvalidTableException.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/InvalidTableException.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/InvalidTableException.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/InvalidTableException.java Mon Jul 29 21:08:03 2013
@@ -18,31 +18,17 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
+import org.apache.hadoop.hive.ql.ErrorMsg;
+
 /**
  * Generic exception class for Hive.
- * 
  */
 
 public class InvalidTableException extends HiveException {
   String tableName;
 
   public InvalidTableException(String tableName) {
-    super();
-    this.tableName = tableName;
-  }
-
-  public InvalidTableException(String message, String tableName) {
-    super(message);
-    this.tableName = tableName;
-  }
-
-  public InvalidTableException(Throwable cause, String tableName) {
-    super(cause);
-    this.tableName = tableName;
-  }
-
-  public InvalidTableException(String message, Throwable cause, String tableName) {
-    super(message, cause);
+    super(ErrorMsg.INVALID_TABLE, tableName);
     this.tableName = tableName;
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Mon Jul 29 21:08:03 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -200,7 +201,7 @@ public class Partition implements Serial
           }
         }
         // set default if columns are not set
-        if (tPartition.getSd().getCols() == null || tPartition.getSd().getCols().size() == 0) {
+        if (tPartition.getSd().getCols() == null) {
           if (table.getCols() != null) {
             tPartition.getSd().setCols(table.getCols());
           }
@@ -519,7 +520,19 @@ public class Partition implements Serial
   }
 
   public List<FieldSchema> getCols() {
-    return tPartition.getSd().getCols();
+    if (!SerDeUtils.shouldGetColsFromSerDe(
+        tPartition.getSd().getSerdeInfo().getSerializationLib())) {
+      return tPartition.getSd().getCols();
+    }
+
+    try {
+      return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer());
+    } catch (HiveException e) {
+      LOG.error("Unable to get cols from serde: " +
+          tPartition.getSd().getSerdeInfo().getSerializationLib(), e);
+    }
+
+    return new ArrayList<FieldSchema>();
   }
 
   public String getLocation() {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java Mon Jul 29 21:08:03 2013
@@ -48,12 +48,12 @@ import org.codehaus.jackson.map.ObjectMa
  * json.
  */
 public class JsonMetaDataFormatter implements MetaDataFormatter {
-    private static final Log LOG = LogFactory.getLog("hive.ql.exec.DDLTask");
+    private static final Log LOG = LogFactory.getLog(JsonMetaDataFormatter.class);
 
     /**
      * Convert the map to a JSON string.
      */
-    public void asJson(OutputStream out, Map<String, Object> data)
+    private void asJson(OutputStream out, Map<String, Object> data)
         throws HiveException
     {
         try {
@@ -66,70 +66,29 @@ public class JsonMetaDataFormatter imple
     /**
      * Write an error message.
      */
-    public void error(OutputStream out, String msg, int errorCode)
+    @Override
+    public void error(OutputStream out, String msg, int errorCode, String sqlState)
         throws HiveException
     {
-        asJson(out,
-               MapBuilder.create()
-               .put("error", msg)
-               .put("errorCode", errorCode)
-               .build());
+        error(out, msg, errorCode, sqlState, null);
     }
-
-    /**
-     * Write a log warn message.
-     */
-    public void logWarn(OutputStream out, String msg, int errorCode)
-        throws HiveException
-    {
-        LOG.warn(msg);
-        error(out, msg, errorCode);
-    }
-
-    /**
-     * Write a log info message.
-     */
-    public void logInfo(OutputStream out, String msg, int errorCode)
-        throws HiveException
-    {
-        LOG.info(msg);
-        error(out, msg, errorCode);
-    }
-
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, int errorCode) {
-        try {
-            console.printError(msg);
-            error(console.getOutStream(), msg, errorCode);
-        } catch (HiveException e) {
-            console.printError("unable to create json: " + e);
+    @Override
+    public void error(OutputStream out, String errorMessage, int errorCode, String sqlState, String errorDetail) throws HiveException {
+        MapBuilder mb = MapBuilder.create().put("error", errorMessage);
+        if(errorDetail != null) {
+            mb.put("errorDetail", errorDetail);
         }
-    }
-
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, String detail,
-                             int errorCode)
-    {
-        try {
-            console.printError(msg, detail);
-            asJson(console.getOutStream(),
-                   MapBuilder.create()
-                   .put("error", msg)
-                   .put("errorDetail", detail)
-                   .put("errorCode", errorCode)
-                   .build());
-        } catch (HiveException e) {
-            console.printError("unable to create json: " + e);
+        mb.put("errorCode", errorCode);
+        if(sqlState != null) {
+          mb.put("sqlState", sqlState);
         }
+        asJson(out,mb.build());
     }
 
     /**
      * Show a list of tables.
      */
+    @Override
     public void showTables(DataOutputStream out, Set<String> tables)
         throws HiveException
     {
@@ -142,6 +101,7 @@ public class JsonMetaDataFormatter imple
     /**
      * Describe table.
      */
+    @Override
     public void describeTable(DataOutputStream out,
                               String colPath, String tableName,
                               Table tbl, Partition part, List<FieldSchema> cols,
@@ -178,6 +138,7 @@ public class JsonMetaDataFormatter imple
             .build();
     }
 
+    @Override
     public void showTableStatus(DataOutputStream out,
                                 Hive db,
                                 HiveConf conf,
@@ -366,6 +327,7 @@ public class JsonMetaDataFormatter imple
     /**
      * Show the table partitions.
      */
+    @Override
     public void showTablePartitons(DataOutputStream out, List<String> parts)
         throws HiveException
     {
@@ -424,6 +386,7 @@ public class JsonMetaDataFormatter imple
     /**
      * Show a list of databases
      */
+    @Override
     public void showDatabases(DataOutputStream out, List<String> databases)
         throws HiveException
     {
@@ -436,6 +399,7 @@ public class JsonMetaDataFormatter imple
     /**
      * Show the description of a database
      */
+    @Override
     public void showDatabaseDescription(DataOutputStream out,
                                         String database,
                                         String comment,

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Mon Jul 29 21:08:03 2013
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
@@ -325,4 +326,11 @@ public final class MetaDataFormatUtils {
     formatOutput(ShowIndexesDesc.getSchema().split("#")[0].split(","), indexCols);
     return indexCols.toString();
   }
+  public static MetaDataFormatter getFormatter(HiveConf conf) {
+    if ("json".equals(conf.get(HiveConf.ConfVars.HIVE_DDL_OUTPUT_FORMAT.varname, "text"))) {
+      return new JsonMetaDataFormatter();
+    } else {
+      return new TextMetaDataFormatter(conf.getIntVar(HiveConf.ConfVars.CLIPRETTYOUTPUTNUMCOLS));
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java Mon Jul 29 21:08:03 2013
@@ -38,49 +38,18 @@ import org.apache.hadoop.hive.ql.session
  */
 public interface MetaDataFormatter {
     /**
-     * Generic error code.  This and the other error codes are
-     * designed to match the HTTP status codes.
-     */
-    static final int ERROR = 500;
-
-    /**
-     * Missing error code.
-     */
-    static final int MISSING = 404;
-
-    /**
-     * Conflict error code.
-     */
-    static final int CONFLICT = 409;
-
-    /**
      * Write an error message.
+     * @param sqlState if {@code null}, will be ignored
      */
-    public void error(OutputStream out, String msg, int errorCode)
-        throws HiveException;
-
-    /**
-     * Write a log warn message.
-     */
-    public void logWarn(OutputStream out, String msg, int errorCode)
-        throws HiveException;
-
-    /**
-     * Write a log info message.
-     */
-    public void logInfo(OutputStream out, String msg, int errorCode)
+    public void error(OutputStream out, String msg, int errorCode, String sqlState)
         throws HiveException;
 
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, int errorCode);
-
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, String detail,
-                             int errorCode);
+  /**
+   * @param sqlState if {@code null}, will be skipped in output
+   * @param errorDetail usually string version of some Exception, if {@code null}, will be ignored
+   */
+    public void error(OutputStream out, String errorMessage, int errorCode, String sqlState, String errorDetail)
+          throws HiveException;
 
     /**
      * Show a list of tables.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Mon Jul 29 21:08:03 2013
@@ -49,8 +49,8 @@ import org.apache.hadoop.hive.shims.Shim
  * Format table and index information for human readability using
  * simple lines of text.
  */
-public class TextMetaDataFormatter implements MetaDataFormatter {
-    private static final Log LOG = LogFactory.getLog("hive.ql.exec.DDLTask");
+class TextMetaDataFormatter implements MetaDataFormatter {
+    private static final Log LOG = LogFactory.getLog(TextMetaDataFormatter.class);
 
     private static final int separator = Utilities.tabCode;
     private static final int terminator = Utilities.newLineCode;
@@ -67,54 +67,35 @@ public class TextMetaDataFormatter imple
     /**
      * Write an error message.
      */
-    public void error(OutputStream out, String msg, int errorCode)
+    @Override
+    public void error(OutputStream out, String msg, int errorCode, String sqlState)
         throws HiveException
     {
-        try {
-            out.write(msg.getBytes("UTF-8"));
-            out.write(terminator);
-        } catch (Exception e) {
-            throw new HiveException(e);
-        }
-    }
-
-    /**
-     * Write a log warn message.
-     */
-    public void logWarn(OutputStream out, String msg, int errorCode)
-        throws HiveException
-    {
-        LOG.warn(msg);
+        error(out, msg, errorCode, sqlState, null);
     }
 
-    /**
-     * Write a log info message.
-     */
-    public void logInfo(OutputStream out, String msg, int errorCode)
-        throws HiveException
+    @Override
+    public void error(OutputStream out, String errorMessage, int errorCode, String sqlState, String errorDetail)
+          throws HiveException
     {
-        LOG.info(msg);
-    }
-
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, int errorCode) {
-        console.printError(msg);
-    }
-
-    /**
-     * Write a console error message.
-     */
-    public void consoleError(LogHelper console, String msg, String detail,
-                             int errorCode)
-    {
-        console.printError(msg, detail);
+      try {
+        out.write(errorMessage.getBytes("UTF-8"));
+        if(errorDetail != null) {
+          out.write(errorDetail.getBytes("UTF-8"));
+        }
+        out.write(errorCode);
+        if(sqlState != null) {
+          out.write(sqlState.getBytes("UTF-8"));//this breaks all the tests in .q files
+        }
+        out.write(terminator);
+      } catch (Exception e) {
+          throw new HiveException(e);
+        }
     }
-
     /**
      * Show a list of tables.
      */
+    @Override
     public void showTables(DataOutputStream out, Set<String> tables)
         throws HiveException
     {
@@ -131,33 +112,34 @@ public class TextMetaDataFormatter imple
         }
     }
 
+    @Override
     public void describeTable(DataOutputStream outStream,
                               String colPath, String tableName,
                               Table tbl, Partition part, List<FieldSchema> cols,
                               boolean isFormatted, boolean isExt, boolean isPretty)
          throws HiveException {
         try {
+          String output;
           if (colPath.equals(tableName)) {
             List<FieldSchema> partCols = tbl.isPartitioned() ? tbl.getPartCols() : null;
-            outStream.writeBytes(
-              isPretty ?
-                  MetaDataPrettyFormatUtils.getAllColumnsInformation(
-                      cols, partCols, prettyOutputNumCols)
+            output = isPretty ?
+                MetaDataPrettyFormatUtils.getAllColumnsInformation(
+                    cols, partCols, prettyOutputNumCols)
                 :
-                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
-              );
+                MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted);
           } else {
-            outStream.writeBytes(
-                MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
+            output = MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted);
           }
+          outStream.write(output.getBytes());
 
           if (tableName.equals(colPath)) {
             if (isFormatted) {
               if (part != null) {
-                outStream.writeBytes(MetaDataFormatUtils.getPartitionInformation(part));
+                output = MetaDataFormatUtils.getPartitionInformation(part);
               } else {
-                outStream.writeBytes(MetaDataFormatUtils.getTableInformation(tbl));
+                output = MetaDataFormatUtils.getTableInformation(tbl);
               }
+              outStream.write(output.getBytes());
             }
 
           // if extended desc table then show the complete details of the table
@@ -168,7 +150,7 @@ public class TextMetaDataFormatter imple
                 // show partition information
                 outStream.writeBytes("Detailed Partition Information");
                 outStream.write(separator);
-                outStream.writeBytes(part.getTPartition().toString());
+                outStream.write(part.getTPartition().toString().getBytes());
                 outStream.write(separator);
                 // comment column is empty
                 outStream.write(terminator);
@@ -176,7 +158,7 @@ public class TextMetaDataFormatter imple
                 // show table information
                 outStream.writeBytes("Detailed Table Information");
                 outStream.write(separator);
-                outStream.writeBytes(tbl.getTTable().toString());
+                outStream.write(tbl.getTTable().toString().getBytes());
                 outStream.write(separator);
                 outStream.write(terminator);
               }
@@ -187,6 +169,7 @@ public class TextMetaDataFormatter imple
         }
     }
 
+    @Override
     public void showTableStatus(DataOutputStream outStream,
                                 Hive db,
                                 HiveConf conf,
@@ -406,6 +389,7 @@ public class TextMetaDataFormatter imple
     /**
      * Show the table partitions.
      */
+    @Override
     public void showTablePartitons(DataOutputStream outStream, List<String> parts)
         throws HiveException
     {
@@ -430,6 +414,7 @@ public class TextMetaDataFormatter imple
     /**
      * Show the list of databases
      */
+    @Override
     public void showDatabases(DataOutputStream outStream, List<String> databases)
         throws HiveException
         {
@@ -447,6 +432,7 @@ public class TextMetaDataFormatter imple
     /**
      * Describe a database
      */
+    @Override
     public void showDatabaseDescription(DataOutputStream outStream,
                                         String database,
                                         String comment,
@@ -458,7 +444,7 @@ public class TextMetaDataFormatter imple
             outStream.writeBytes(database);
             outStream.write(separator);
             if (comment != null) {
-              outStream.writeBytes(comment);
+              outStream.write(comment.getBytes());
             }
             outStream.write(separator);
             if (location != null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Jul 29 21:08:03 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -91,6 +92,8 @@ public class GenMRFileSink1 implements N
     ParseContext parseCtx = ctx.getParseCtx();
     boolean chDir = false;
     Task<? extends Serializable> currTask = ctx.getCurrTask();
+    ctx.addRootIfPossible(currTask);
+
     FileSinkOperator fsOp = (FileSinkOperator) nd;
     boolean isInsertTable = // is INSERT OVERWRITE TABLE
     fsOp.getConf().getTableInfo().getTableName() != null &&
@@ -106,7 +109,7 @@ public class GenMRFileSink1 implements N
     if (fileSinkDescs != null) {
       Task<? extends Serializable> childTask = fileSinkDescs.get(fsOp.getConf());
       processLinkedFileDesc(ctx, childTask);
-      return null;
+      return true;
     }
 
     // Has the user enabled merging of files for map-only jobs or for all jobs
@@ -142,10 +145,10 @@ public class GenMRFileSink1 implements N
               // or for a map-reduce job
               MapredWork currWork = (MapredWork) currTask.getWork();
               boolean mergeMapOnly =
-                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReducer() == null;
+                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
               boolean mergeMapRed =
                   hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
-                      currWork.getReducer() != null;
+                      currWork.getReduceWork() != null;
               if (mergeMapOnly || mergeMapRed) {
                 chDir = true;
               }
@@ -181,7 +184,7 @@ public class GenMRFileSink1 implements N
       }
     }
 
-    return null;
+    return true;
   }
 
   /*
@@ -189,26 +192,12 @@ public class GenMRFileSink1 implements N
    * Use the task created by the first linked file descriptor
    */
   private void processLinkedFileDesc(GenMRProcContext ctx,
-    Task<? extends Serializable> childTask)
-    throws SemanticException {
-    Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
-    String currAliasId = ctx.getCurrAliasId();
-    List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
-    List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
+      Task<? extends Serializable> childTask) throws SemanticException {
     Task<? extends Serializable> currTask = ctx.getCurrTask();
-
-    if (currTopOp != null) {
-      if (!seenOps.contains(currTopOp)) {
-        seenOps.add(currTopOp);
-        GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
-          (MapredWork) currTask.getWork(), false, ctx);
-      }
-
-      if (!rootTasks.contains(currTask)
-          && (currTask.getParentTasks() == null
-              || currTask.getParentTasks().isEmpty())) {
-        rootTasks.add(currTask);
-      }
+    Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
+    if (currTopOp != null && !ctx.isSeenOp(currTask, currTopOp)) {
+      String currAliasId = ctx.getCurrAliasId();
+      GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
     }
 
     if (childTask != null) {
@@ -251,7 +240,10 @@ public class GenMRFileSink1 implements N
 
     // mark the MapredWork and FileSinkOperator for gathering stats
     nd.getConf().setGatherStats(true);
-    mrWork.setGatheringStats(true);
+    mrWork.getMapWork().setGatheringStats(true);
+    if (mrWork.getReduceWork() != null) {
+      mrWork.getReduceWork().setGatheringStats(true);
+    }
     nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
     nd.getConf().setMaxStatsKeyPrefixLength(
         hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
@@ -357,7 +349,8 @@ public class GenMRFileSink1 implements N
     //
     MoveWork dummyMv = new MoveWork(null, null, null,
         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
-    MapredWork cplan;
+    MapWork cplan;
+    Serializable work;
 
     if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
         fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
@@ -370,6 +363,7 @@ public class GenMRFileSink1 implements N
         LOG.info("RCFile format- Using block level merge");
         cplan = createRCFileMergeTask(fsInputDesc, finalName,
             dpCtx != null && dpCtx.getNumDPCols() > 0);
+        work = cplan;
       } catch (ClassNotFoundException e) {
         String msg = "Illegal input format class: " + inputFormatClass;
         throw new SemanticException(msg);
@@ -377,12 +371,14 @@ public class GenMRFileSink1 implements N
 
     } else {
       cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+      work = new MapredWork();
+      ((MapredWork)work).setMapWork(cplan);
       // use CombineHiveInputFormat for map-only merging
     }
     cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
     // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
     // know if merge MR2 will be triggered at execution time
-    ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, cplan,
+    ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
         fsInputDesc.getFinalDirName());
 
     // keep the dynamic partition context in conditional task resolver context
@@ -483,7 +479,7 @@ public class GenMRFileSink1 implements N
    *          the last FileSinkOperator in the parent MapReduce work
    * @return the MapredWork
    */
-  private MapredWork createMRWorkForMergingFiles (HiveConf conf,
+  private MapWork createMRWorkForMergingFiles (HiveConf conf,
     Operator<? extends OperatorDesc> topOp,  FileSinkDesc fsDesc) {
 
     ArrayList<String> aliases = new ArrayList<String>();
@@ -492,10 +488,10 @@ public class GenMRFileSink1 implements N
     aliases.add(inputDir); // dummy alias: just use the input path
 
     // constructing the default MapredWork
-    MapredWork cplan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapWork cplan = cMrPlan.getMapWork();
     cplan.getPathToAliases().put(inputDir, aliases);
     cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
-    cplan.setNumReduceTasks(0);
     cplan.getAliasToWork().put(inputDir, topOp);
     cplan.setMapperCannotSpanPartns(true);
 
@@ -510,7 +506,7 @@ public class GenMRFileSink1 implements N
    * @return MergeWork if table is stored as RCFile,
    *         null otherwise
    */
-  private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+  private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
       String finalName, boolean hasDynamicPartitions) throws SemanticException {
 
     String inputDir = fsInputDesc.getFinalDirName();
@@ -573,7 +569,7 @@ public class GenMRFileSink1 implements N
    */
   private ConditionalTask createCondTask(HiveConf conf,
       Task<? extends Serializable> currTask, MoveWork mvWork,
-      MapredWork mergeWork, String inputPath) {
+      Serializable mergeWork, String inputPath) {
 
     // There are 3 options for this ConditionalTask:
     // 1) Merge the partitions
@@ -702,8 +698,6 @@ public class GenMRFileSink1 implements N
     String currAliasId = ctx.getCurrAliasId();
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
         ctx.getOpTaskMap();
-    List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
-    List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
 
     // Set the move task to be dependent on the current task
     if (mvTask != null) {
@@ -717,22 +711,13 @@ public class GenMRFileSink1 implements N
     if (currTopOp != null) {
       Task<? extends Serializable> mapTask = opTaskMap.get(null);
       if (mapTask == null) {
-        if (!seenOps.contains(currTopOp)) {
-          seenOps.add(currTopOp);
-          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
-              (MapredWork) currTask.getWork(), false, ctx);
+        if (!ctx.isSeenOp(currTask, currTopOp)) {
+          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
         }
         opTaskMap.put(null, currTask);
-        if (!rootTasks.contains(currTask)
-            && (currTask.getParentTasks() == null
-                || currTask.getParentTasks().isEmpty())) {
-          rootTasks.add(currTask);
-        }
       } else {
-        if (!seenOps.contains(currTopOp)) {
-          seenOps.add(currTopOp);
-          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
-              (MapredWork) mapTask.getWork(), false, ctx);
+        if (!ctx.isSeenOp(currTask, currTopOp)) {
+          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, mapTask, false, ctx);
         } else {
           UnionOperator currUnionOp = ctx.getCurrUnionOp();
           if (currUnionOp != null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java Mon Jul 29 21:08:03 2013
@@ -53,8 +53,7 @@ public class GenMROperator implements No
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
     mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
-        mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx
-        .getCurrAliasId()));
-    return null;
+        mapredCtx.getCurrTask(), mapredCtx.getCurrAliasId()));
+    return true;
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Mon Jul 29 21:08:03 2013
@@ -55,7 +55,6 @@ public class GenMRProcContext implements
    */
   public static class GenMapRedCtx {
     Task<? extends Serializable> currTask;
-    Operator<? extends OperatorDesc> currTopOp;
     String currAliasId;
 
     public GenMapRedCtx() {
@@ -64,15 +63,10 @@ public class GenMRProcContext implements
     /**
      * @param currTask
      *          the current task
-     * @param currTopOp
-     *          the current top operator being traversed
      * @param currAliasId
-     *          the current alias for the to operator
      */
-    public GenMapRedCtx(Task<? extends Serializable> currTask,
-        Operator<? extends OperatorDesc> currTopOp, String currAliasId) {
+    public GenMapRedCtx(Task<? extends Serializable> currTask, String currAliasId) {
       this.currTask = currTask;
-      this.currTopOp = currTopOp;
       this.currAliasId = currAliasId;
     }
 
@@ -84,13 +78,6 @@ public class GenMRProcContext implements
     }
 
     /**
-     * @return current top operator
-     */
-    public Operator<? extends OperatorDesc> getCurrTopOp() {
-      return currTopOp;
-    }
-
-    /**
      * @return current alias
      */
     public String getCurrAliasId() {
@@ -103,13 +90,13 @@ public class GenMRProcContext implements
    *
    */
   public static class GenMRUnionCtx {
-    Task<? extends Serializable> uTask;
+    final Task<? extends Serializable> uTask;
     List<String> taskTmpDir;
     List<TableDesc> tt_desc;
     List<Operator<? extends OperatorDesc>> listTopOperators;
 
-    public GenMRUnionCtx() {
-      uTask = null;
+    public GenMRUnionCtx(Task<? extends Serializable> uTask) {
+      this.uTask = uTask;
       taskTmpDir = new ArrayList<String>();
       tt_desc = new ArrayList<TableDesc>();
       listTopOperators = new ArrayList<Operator<? extends OperatorDesc>>();
@@ -119,10 +106,6 @@ public class GenMRProcContext implements
       return uTask;
     }
 
-    public void setUTask(Task<? extends Serializable> uTask) {
-      this.uTask = uTask;
-    }
-
     public void addTaskTmpDir(String taskTmpDir) {
       this.taskTmpDir.add(taskTmpDir);
     }
@@ -156,8 +139,10 @@ public class GenMRProcContext implements
   private HiveConf conf;
   private
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap;
+  private
+    HashMap<Task<? extends Serializable>, List<Operator<? extends OperatorDesc>>> taskToSeenOps;
+
   private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
-  private List<Operator<? extends OperatorDesc>> seenOps;
   private List<FileSinkOperator> seenFileSinkOps;
 
   private ParseContext parseCtx;
@@ -169,7 +154,6 @@ public class GenMRProcContext implements
   private Operator<? extends OperatorDesc> currTopOp;
   private UnionOperator currUnionOp;
   private String currAliasId;
-  private List<Operator<? extends OperatorDesc>> rootOps;
   private DependencyCollectionTask dependencyTaskForMultiInsert;
 
   // If many fileSinkDescs are linked to each other, it is a good idea to keep track of
@@ -213,14 +197,13 @@ public class GenMRProcContext implements
   public GenMRProcContext(
       HiveConf conf,
       HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap,
-      List<Operator<? extends OperatorDesc>> seenOps, ParseContext parseCtx,
+      ParseContext parseCtx,
       List<Task<MoveWork>> mvTask,
       List<Task<? extends Serializable>> rootTasks,
       LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx,
       Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
     this.conf = conf;
     this.opTaskMap = opTaskMap;
-    this.seenOps = seenOps;
     this.mvTask = mvTask;
     this.parseCtx = parseCtx;
     this.rootTasks = rootTasks;
@@ -231,9 +214,9 @@ public class GenMRProcContext implements
     currTopOp = null;
     currUnionOp = null;
     currAliasId = null;
-    rootOps = new ArrayList<Operator<? extends OperatorDesc>>();
-    rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
+    taskToSeenOps = new HashMap<Task<? extends Serializable>,
+        List<Operator<? extends OperatorDesc>>>();
     dependencyTaskForMultiInsert = null;
     linkedFileDescTasks = null;
   }
@@ -255,11 +238,17 @@ public class GenMRProcContext implements
     this.opTaskMap = opTaskMap;
   }
 
-  /**
-   * @return operators already visited
-   */
-  public List<Operator<? extends OperatorDesc>> getSeenOps() {
-    return seenOps;
+  public boolean isSeenOp(Task task, Operator operator) {
+    List<Operator<?extends OperatorDesc>> seenOps = taskToSeenOps.get(task);
+    return seenOps != null && seenOps.contains(operator);
+  }
+
+  public void addSeenOp(Task task, Operator operator) {
+    List<Operator<?extends OperatorDesc>> seenOps = taskToSeenOps.get(task);
+    if (seenOps == null) {
+      taskToSeenOps.put(task, seenOps = new ArrayList<Operator<? extends OperatorDesc>>());
+    }
+    seenOps.add(operator);
   }
 
   /**
@@ -270,14 +259,6 @@ public class GenMRProcContext implements
   }
 
   /**
-   * @param seenOps
-   *          operators already visited
-   */
-  public void setSeenOps(List<Operator<? extends OperatorDesc>> seenOps) {
-    this.seenOps = seenOps;
-  }
-
-  /**
    * @param seenFileSinkOps
    *          file sink operators already visited
    */
@@ -286,21 +267,6 @@ public class GenMRProcContext implements
   }
 
   /**
-   * @return top operators for tasks
-   */
-  public List<Operator<? extends OperatorDesc>> getRootOps() {
-    return rootOps;
-  }
-
-  /**
-   * @param rootOps
-   *          top operators for tasks
-   */
-  public void setRootOps(List<Operator<? extends OperatorDesc>> rootOps) {
-    this.rootOps = rootOps;
-  }
-
-  /**
    * @return current parse context
    */
   public ParseContext getParseCtx() {
@@ -345,6 +311,15 @@ public class GenMRProcContext implements
     this.rootTasks = rootTasks;
   }
 
+  public boolean addRootIfPossible(Task<? extends Serializable> task) {
+    if (task.getParentTasks() == null || task.getParentTasks().isEmpty()) {
+      if (!rootTasks.contains(task)) {
+        return rootTasks.add(task);
+      }
+    }
+    return false;
+  }
+
   /**
    * @return operator to task mappings
    */

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Mon Jul 29 21:08:03 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 
@@ -64,20 +63,21 @@ public class GenMRRedSink1 implements No
     GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork currPlan = (MapredWork) currTask.getWork();
-    Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
+
+    if (op.getNumChild() != 1) {
+      throw new IllegalStateException("Expecting operator " + op + " to have one child. " +
+          "But found multiple children : " + op.getChildOperators());
+    }
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
-    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
-        .getOpTaskMap();
-    Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+    Task<? extends Serializable> oldTask = ctx.getOpTaskMap().get(reducer);
 
-    ctx.setCurrTopOp(currTopOp);
     ctx.setCurrAliasId(currAliasId);
     ctx.setCurrTask(currTask);
 
     // If the plan for this reducer does not exist, initialize the plan
-    if (opMapTask == null) {
-      if (currPlan.getReducer() == null) {
+    if (oldTask == null) {
+      if (currPlan.getReduceWork() == null) {
         GenMapRedUtils.initPlan(op, ctx);
       } else {
         GenMapRedUtils.splitPlan(op, ctx);
@@ -85,14 +85,18 @@ public class GenMRRedSink1 implements No
     } else {
       // This will happen in case of joins. The current plan can be thrown away
       // after being merged with the original plan
-      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false);
-      currTask = opMapTask;
+      GenMapRedUtils.joinPlan(currTask, oldTask, ctx);
+      currTask = oldTask;
       ctx.setCurrTask(currTask);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
-        ctx.getCurrAliasId()));
-    return null;
-  }
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId()));
 
+    if (GenMapRedUtils.hasBranchFinished(nodeOutputs)) {
+      ctx.addRootIfPossible(currTask);
+      return false;
+    }
+
+    return true;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Mon Jul 29 21:08:03 2013
@@ -57,28 +57,32 @@ public class GenMRRedSink2 implements No
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-    Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
     Map<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
         .getOpTaskMap();
-    Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+    Task<? extends Serializable> oldTask = opTaskMap.get(reducer);
 
-    ctx.setCurrTopOp(currTopOp);
     ctx.setCurrAliasId(currAliasId);
     ctx.setCurrTask(currTask);
 
-    if (opMapTask == null) {
+    if (oldTask == null) {
       GenMapRedUtils.splitPlan(op, ctx);
     } else {
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true);
-      currTask = opMapTask;
+      GenMapRedUtils.splitPlan(op, currTask, oldTask, ctx);
+      currTask = oldTask;
       ctx.setCurrTask(currTask);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(),
         ctx.getCurrAliasId()));
-    return null;
+
+    if (GenMapRedUtils.hasBranchFinished(nodeOutputs)) {
+      ctx.addRootIfPossible(currTask);
+      return false;
+    }
+
+    return true;
   }
 
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Mon Jul 29 21:08:03 2013
@@ -85,13 +85,13 @@ public class GenMRRedSink3 implements No
     // If the plan for this reducer does not exist, initialize the plan
     if (reducerTask == null) {
       // When the reducer is encountered for the first time
-      if (plan.getReducer() == null) {
+      if (plan.getReduceWork() == null) {
         GenMapRedUtils.initUnionPlan(op, union, ctx, unionTask);
         // When union is followed by a multi-table insert
       } else {
         GenMapRedUtils.splitPlan(op, ctx);
       }
-    } else if (plan.getReducer() == reducer) {
+    } else if (plan.getReduceWork() != null && plan.getReduceWork().getReducer() == reducer) {
       // The union is already initialized. However, the union is walked from
       // another input
       // initUnionPlan is idempotent
@@ -101,11 +101,11 @@ public class GenMRRedSink3 implements No
       ctx.setCurrTask(reducerTask);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(),
         ctx.getCurrAliasId()));
 
     // the union operator has been processed
     ctx.setCurrUnionOp(null);
-    return null;
+    return true;
   }
 }



Mime
View raw message