hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1662810 - in /hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io: decode/ encoded/
Date Fri, 27 Feb 2015 20:28:12 GMT
Author: sershe
Date: Fri Feb 27 20:28:12 2015
New Revision: 1662810

URL: http://svn.apache.org/r1662810
Log:
Some minor cleanup

Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1662810&r1=1662809&r2=1662810&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
Fri Feb 27 20:28:12 2015
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
@@ -48,7 +49,7 @@ public abstract class ColumnVectorProduc
   }
 
   private final class UncaughtErrorHandler implements FutureCallback<Void> {
-    private final EncodedDataConsumer edc;
+    private final EncodedDataConsumer<BatchKey> edc;
 
     private UncaughtErrorHandler(EncodedDataConsumer edc) {
       this.edc = edc;
@@ -62,6 +63,7 @@ public abstract class ColumnVectorProduc
     @Override
     public void onFailure(Throwable t) {
       // Reader is not supposed to throw AFTER calling setError.
+      LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage());
       edc.setError(t);
     }
   }
@@ -79,12 +81,7 @@ public abstract class ColumnVectorProduc
     // Get the source of encoded data.
     EncodedDataProducer<BatchKey> edp = getEncodedDataProducer();
     // Create the consumer of encoded data; it will coordinate decoding to CVBs.
-    final EncodedDataConsumer edc;
-    if (edp instanceof OrcEncodedDataProducer) {
-      edc = new OrcEncodedDataConsumer(this, consumer, columnIds.size());
-    } else {
-      edc = new EncodedDataConsumer(this, consumer, columnIds.size());
-    }
+    final EncodedDataConsumer<BatchKey> edc = createConsumer(this, consumer, columnIds.size());
     // Then, get the specific reader of encoded data out of the producer.
     EncodedDataReader<BatchKey> reader = edp.createReader(
         split, columnIds, sarg, columnNames, edc);
@@ -98,9 +95,11 @@ public abstract class ColumnVectorProduc
     return edc;
   }
 
+  protected abstract EncodedDataConsumer<BatchKey> createConsumer(
+      ColumnVectorProducer<BatchKey> cvp, Consumer<ColumnVectorBatch> consumer,
int size);
+
   protected abstract EncodedDataProducer<BatchKey> getEncodedDataProducer();
 
-  protected abstract void decodeBatch(EncodedDataConsumer<BatchKey> batchKeyEncodedDataConsumer,
-      EncodedColumnBatch<BatchKey> batch,
-      Consumer<ColumnVectorBatch> downstreamConsumer);
+  protected abstract void decodeBatch(EncodedDataConsumer<BatchKey> context,
+      EncodedColumnBatch<BatchKey> batch, Consumer<ColumnVectorBatch> downstreamConsumer);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java?rev=1662810&r1=1662809&r2=1662810&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
Fri Feb 27 20:28:12 2015
@@ -24,12 +24,13 @@ import java.util.List;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 
 /**
  *
  */
-public class EncodedDataConsumer<BatchKey> implements ConsumerFeedback<ColumnVectorBatch>,
+public abstract class EncodedDataConsumer<BatchKey> implements ConsumerFeedback<ColumnVectorBatch>,
     Consumer<EncodedColumnBatch<BatchKey>> {
   private volatile boolean isStopped = false;
   // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep
dumb.
@@ -37,7 +38,7 @@ public class EncodedDataConsumer<BatchKe
   private ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback;
   private final Consumer<ColumnVectorBatch> downstreamConsumer;
   private final int colCount;
-  private ColumnVectorProducer cvp;
+  private ColumnVectorProducer<BatchKey> cvp;
 
   public EncodedDataConsumer(ColumnVectorProducer<BatchKey> cvp,
       Consumer<ColumnVectorBatch> consumer, int colCount) {
@@ -138,7 +139,8 @@ public class EncodedDataConsumer<BatchKe
       batches.addAll(pendingData.values());
       pendingData.clear();
     }
-    List<EncodedColumnBatch.StreamBuffer> dataToDiscard = new ArrayList<EncodedColumnBatch.StreamBuffer>(batches.size()
* colCount * 2);
+    List<EncodedColumnBatch.StreamBuffer> dataToDiscard = new ArrayList<StreamBuffer>(
+        batches.size() * colCount * 2);
     for (EncodedColumnBatch<BatchKey> batch : batches) {
       synchronized (batch) {
         for (EncodedColumnBatch.StreamBuffer[] bb : batch.columnData) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1662810&r1=1662809&r2=1662810&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
Fri Feb 27 20:28:12 2015
@@ -58,7 +58,7 @@ public class OrcColumnVectorProducer ext
   private final OrcMetadataCache _metadataCache;
   private boolean _skipCorrupt;
   private int _previousStripeIndex;
-
+ 
   public OrcColumnVectorProducer(ExecutorService executor, OrcEncodedDataProducer edp,
       Configuration conf) {
     super(executor);
@@ -78,10 +78,10 @@ public class OrcColumnVectorProducer ext
   }
 
   @Override
-  protected void decodeBatch(EncodedDataConsumer<OrcBatchKey> edc,
+  protected void decodeBatch(EncodedDataConsumer<OrcBatchKey> context,
       EncodedColumnBatch<OrcBatchKey> batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
-    OrcEncodedDataConsumer oedc = (OrcEncodedDataConsumer) edc;
+    OrcEncodedDataConsumer oedc = (OrcEncodedDataConsumer)context;
     String fileName = batch.batchKey.file;
     int currentStripeIndex = batch.batchKey.stripeIx;
     if (_previousStripeIndex == -1) {
@@ -446,4 +446,10 @@ public class OrcColumnVectorProducer ext
     return rowIndexEntry.getStatistics().getNumberOfValues();
   }
 
+  @Override
+  protected EncodedDataConsumer<OrcBatchKey> createConsumer(ColumnVectorProducer<OrcBatchKey>
cvp,
+      Consumer<ColumnVectorBatch> consumer, int colCount) {
+    return new OrcEncodedDataConsumer(cvp, consumer, colCount);
+  }
+
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1662810&r1=1662809&r2=1662810&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
Fri Feb 27 20:28:12 2015
@@ -18,22 +18,18 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 
-/**
- *
- */
 public class OrcEncodedDataConsumer extends EncodedDataConsumer<OrcBatchKey> {
-  private ColumnVectorProducer cvp;
-  private Consumer consumer;
   private RecordReaderImpl.TreeReader[] columnReaders;
 
-  public OrcEncodedDataConsumer(ColumnVectorProducer cvp, Consumer consumer, int colCount)
{
+  public OrcEncodedDataConsumer(
+      ColumnVectorProducer<OrcBatchKey> cvp, Consumer<ColumnVectorBatch> consumer,
int colCount) {
     super(cvp, consumer, colCount);
-    this.cvp = cvp;
-    this.consumer = consumer;
-    this.columnReaders = null;
   }
 
   public void setColumnReaders(RecordReaderImpl.TreeReader[] columnReaders) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1662810&r1=1662809&r2=1662810&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
Fri Feb 27 20:28:12 2015
@@ -384,10 +384,6 @@ public class OrcEncodedDataProducer impl
         if (value == null || !value.hasAllIndexes(globalInc)) {
           ensureMetadataReader();
           StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
-          if (DebugUtils.isTraceOrcEnabled()) {
-            LlapIoImpl.LOG.info("Creating stripe reader " + stripeKey.stripeIx + ": " 
-                + si.getOffset() + ", " + si.getLength());
-          }
           if (value == null) {
             value = new OrcStripeMetadata(metadataReader, si, globalInc, sargColumns);
             metadataCache.putStripeMetadata(stripeKey, value);



Mime
View raw message