hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1672782 - in /hive/branches/llap: common/src/java/org/apache/hive/common/util/ common/src/test/org/apache/hive/common/util/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/or...
Date Fri, 10 Apr 2015 22:33:44 GMT
Author: sershe
Date: Fri Apr 10 22:33:43 2015
New Revision: 1672782

URL: http://svn.apache.org/r1672782
Log:
HIVE-10154 : LLAP: GC issues 1 (Sergey Shelukhin)

Added:
    hive/branches/llap/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
    hive/branches/llap/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java

Added: hive/branches/llap/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java?rev=1672782&view=auto
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java (added)
+++ hive/branches/llap/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java Fri Apr 10 22:33:43 2015
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.common.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** Simple object pool of limited size. Implemented as a lock-free ring buffer;
+ * may fail to produce items if there are too many concurrent users. */
+public class FixedSizedObjectPool<T> {
+  public static final Log LOG = LogFactory.getLog(FixedSizedObjectPool.class);
+
+  /**
+   * Object helper for objects stored in the pool.
+   */
+  public static abstract class PoolObjectHelper<T> {
+    /** Called to create an object when one cannot be provided. */
+    protected abstract T create();
+    /** Called before the object is put in the pool (regardless of whether put succeeds),
+     *  if the pool size is not 0 . */
+    protected void resetBeforeOffer(T t) {}
+  }
+
+  /**
+   * Ring buffer has two "markers" - where objects are present ('objects' list), and where they are
+   * removed ('empty' list). This class contains bit shifts and masks for one marker's components
+   * within a long, and provides utility methods to get/set the components.
+   * Marker consists of (examples here for 'objects' list; same for 'empty' list):
+   *  - the marker itself. Set to NO_MARKER if list is empty (e.g. no objects to take from pool),
+   *    otherwise contains the array index of the first element of the list.
+   *  - the 'delta'. Number of elements from the marker that is being modified. Each concurrent
+   *    modification (e.g. take call) increments this to claim an array index. Delta elements
+   *    from the marker cannot be touched by other threads. Delta can never overshoot the other
+   *    marker (or own marker if other is empty), or overflow MAX_DELTA. If delta is set to
+   *    NO_DELTA, it means the marker has been modified during 'take' operation and list cannot
+   *    be touched (see below). In any of these cases, take returns null.
+   *  - the 'refcount'/'rc'. Number of operations occurring on the marker. Each e.g. take incs
+   *    this; when the last of the overlapping operations decreases the refcount, it 'commits'
+   *    the modifications by moving the marker according to delta and resetting delta to 0.
+   *    If the other list does not exist, it's also created (i.e. first 'offer' to a new pool with
+   *    empty 'objects' list will create the 'objects' list); if the list is being exhausted to empty
+   *    by other op (e.g. pool has 2 objects, 2 takes are in progress when offer commits), the
+   *    marker of the other list is still reset to new location, and delta is set to NO_DELTA,
+   *    preventing operations on the lists until the exhausting ops commit and set delta to 0.
+   */
+  private static final class Marker {
+    // Currently the long must fit 2 markers. Setting these bit sizes determines the balance
+    // between max pool size allowed and max concurrency allowed. This balance here is not what we
+    // want (up to 254 of each op while only 65535 objects limit), but it uses whole bytes and is
+    // good for now. Delta and RC take the same number of bits; usually it doesn't make sense to
+    // have more delta.
+    private static final long MARKER_MASK = 0xffffL, DELTA_MASK = 0xffL, RC_MASK = 0xffL;
+    public Marker(int markerShift, int deltaShift, int rcShift) {
+      this.markerShift = markerShift;
+      this.deltaShift = deltaShift;
+      this.rcShift = rcShift;
+    }
+    int markerShift, deltaShift, rcShift;
+
+    public final long setMarker(long dest, long val) {
+      return setValue(dest, val, markerShift, MARKER_MASK);
+    }
+
+    public final long setDelta(long dest, long val) {
+      return setValue(dest, val, deltaShift, DELTA_MASK);
+    }
+
+    public final long setRc(long dest, long val) {
+      return setValue(dest, val, rcShift, RC_MASK);
+    }
+
+    public final long getMarker(long src) {
+      return getValue(src, markerShift, MARKER_MASK);
+    }
+
+    public final long getDelta(long src) {
+      return getValue(src, deltaShift, DELTA_MASK);
+    }
+
+    public final long getRc(long src) {
+      return getValue(src, rcShift, RC_MASK);
+    }
+
+    private final long setValue(long dest, long val, int offset, long mask) {
+      return (dest & (~(mask << offset))) + (val << offset);
+    }
+
+    private final long getValue(long src, int offset, long mask) {
+      return (src >>> offset) & mask;
+    }
+
+    public String toString(long markers) {
+      return "{" + getMarker(markers) + ", " + getDelta(markers) + ", " + getRc(markers) + "}";
+    }
+  }
+  private static final long NO_MARKER = Marker.MARKER_MASK, NO_DELTA = Marker.DELTA_MASK,
+      MAX_DELTA = NO_DELTA - 1, MAX_SIZE = NO_MARKER - 1;
+  private static final long NO_INDEX = 0; // The array index can't be reserved.
+
+  // See Marker class comment.
+  private static final Marker OBJECTS = new Marker(48, 40, 32);
+  private static final Marker EMPTY = new Marker(16, 8, 0);
+  private final AtomicLong state;
+  private final PoolObjectHelper<T> helper;
+  private final T[] pool;
+
+  public FixedSizedObjectPool(int size, PoolObjectHelper<T> helper) {
+    this(size, helper, LOG.isTraceEnabled());
+  }
+
+  @VisibleForTesting
+  public FixedSizedObjectPool(int size, PoolObjectHelper<T> helper, boolean doTraceLog) {
+    if (size > MAX_SIZE) {
+      throw new AssertionError("Size must be <= " + MAX_SIZE);
+    }
+    this.helper = helper;
+    @SuppressWarnings("unchecked")
+    T[] poolTmp = (T[])new Object[size];
+    pool = poolTmp;
+    // Initially, all deltas and rcs are 0; empty list starts at 0; there are no objects to take.
+    state = new AtomicLong(OBJECTS.setMarker(0, NO_MARKER));
+    casLog = doTraceLog ? new CasLog() : null;
+  }
+
+  public T take() {
+    T result = pool.length > 0 ? takeImpl() : null;
+    return (result == null) ? helper.create() : result;
+  }
+
+  public boolean offer(T t) {
+    if (t == null || pool.length == 0) return false; // 0 size means no-pooling case - passthru.
+    helper.resetBeforeOffer(t);
+    return offerImpl(t);
+  }
+
+  private T takeImpl() {
+    long oldState = reserveArrayIndex(OBJECTS, EMPTY);
+    if (oldState == NO_INDEX) return null; // For whatever reason, reserve failed.
+    long originalMarker = OBJECTS.getMarker(oldState), delta = OBJECTS.getDelta(oldState);
+    int arrayIndex = (int)getArrayIndex(originalMarker, delta);
+    T result = pool[arrayIndex];
+    if (result == null) {
+      throwError(oldState, arrayIndex, "null");
+    }
+    pool[arrayIndex] = null;
+    commitArrayIndex(OBJECTS, EMPTY, originalMarker);
+    return result;
+  }
+
+  private boolean offerImpl(T t) {
+    long oldState = reserveArrayIndex(EMPTY, OBJECTS);
+    if (oldState == NO_INDEX) return false; // For whatever reason, reserve failed.
+    long originalMarker = EMPTY.getMarker(oldState), delta = EMPTY.getDelta(oldState);
+    int arrayIndex = (int)getArrayIndex(originalMarker, delta);
+    if (pool[arrayIndex] != null) {
+      throwError(oldState, arrayIndex, "non-null");
+    }
+    pool[arrayIndex] = t;
+    commitArrayIndex(EMPTY, OBJECTS, originalMarker);
+    return true;
+  }
+
+  private void throwError(long oldState, int arrayIndex, String type) {
+    long newState = state.get();
+    if (casLog != null) {
+      casLog.dumpLog(true);
+    }
+    String msg = "Unexpected " + type + " at " + arrayIndex + "; state was "
+        + toString(oldState) + ", now " + toString(newState);
+    LOG.info(msg);
+    throw new AssertionError(msg);
+  }
+
+  private long reserveArrayIndex(Marker from, Marker to) {
+    while (true) {
+      long oldVal = state.get(), marker = from.getMarker(oldVal), delta = from.getDelta(oldVal),
+          rc = from.getRc(oldVal), toMarker = to.getMarker(oldVal), toDelta = to.getDelta(oldVal);
+      if (marker == NO_MARKER) return NO_INDEX; // The list is empty.
+      if (delta == MAX_DELTA) return NO_INDEX; // Too many concurrent operations; spurious failure.
+      if (delta == NO_DELTA) return NO_INDEX; // List is drained and recreated concurrently.
+      if (toDelta == NO_DELTA) { // Same for the OTHER list; spurious.
+        // TODO: the fact that concurrent re-creation of other list necessitates full stop is not
+        //       ideal... the reason is that the list NOT being re-created still uses the list
+        //       being re-created for boundary check; it needs the old value of the other marker.
+        //       However, NO_DELTA means the other marker was already set to a new value. For now,
+        //       assume concurrent re-creation is rare and the gap before commit is tiny.
+        return NO_INDEX;
+      }
+      assert rc <= delta; // There can never be more concurrent takers than uncommitted ones.
+
+      long newDelta = incDeltaValue(marker, toMarker, delta); // Increase target list pos.
+      if (newDelta == NO_DELTA) return NO_INDEX; // Target list is being drained.
+      long newVal = from.setRc(from.setDelta(oldVal, newDelta), rc + 1); // Set delta and refcount.
+      if (setState(oldVal, newVal)) return oldVal;
+    }
+  }
+
+  private void commitArrayIndex(Marker from, Marker to, long originalMarker) {
+    while (true) {
+      long oldVal = state.get(), rc = from.getRc(oldVal);
+      long newVal = from.setRc(oldVal, rc - 1); // Decrease refcount.
+      assert rc > 0;
+      if (rc == 1) {
+        // We are the last of the concurrent operations to finish. Commit.
+        long marker = from.getMarker(oldVal), delta = from.getDelta(oldVal),
+            otherMarker = to.getMarker(oldVal), otherDelta = to.getDelta(oldVal);
+        assert rc <= delta;
+        // Move marker according to delta, change delta to 0.
+        long newMarker = applyDeltaToMarker(marker, otherMarker, delta);
+        newVal = from.setDelta(from.setMarker(newVal, newMarker), 0);
+        if (otherMarker == NO_MARKER) {
+          // The other list doesn't exist, create it at the first index of our op.
+          assert otherDelta == 0;
+          newVal = to.setMarker(newVal, originalMarker);
+        } else if (otherDelta > 0 && otherDelta != NO_DELTA
+            && applyDeltaToMarker(otherMarker, marker, otherDelta) == NO_MARKER) {
+          // The other list will be exhausted when it commits. Create new one pending that commit.
+          newVal = to.setDelta(to.setMarker(newVal, originalMarker), NO_DELTA);
+        }
+      }
+      if (setState(oldVal, newVal)) return;
+    }
+  }
+
+  private boolean setState(long oldVal, long newVal) {
+    boolean result = state.compareAndSet(oldVal, newVal);
+    if (result && casLog != null) {
+      casLog.log(oldVal, newVal);
+    }
+    return result;
+  }
+
+  private long incDeltaValue(long markerFrom, long otherMarker, long delta) {
+    if (delta == pool.length) return NO_DELTA; // The (pool-sized) list is being fully drained.
+    long result = delta + 1;
+    if (getArrayIndex(markerFrom, result) == getArrayIndex(otherMarker, 1)) {
+      return NO_DELTA; // The list is being drained, cannot increase the delta anymore.
+    }
+    return result;
+  }
+
+  private long applyDeltaToMarker(long marker, long markerLimit, long delta) {
+    if (delta == NO_DELTA) return marker; // List was recreated while we were exhausting it.
+    if (delta == pool.length) {
+      assert markerLimit == NO_MARKER; // If we had the entire pool, other list couldn't exist.
+      return NO_MARKER; // We exhausted the entire-pool-sized list.
+    }
+    marker = getArrayIndex(marker, delta); // Just move the marker according to delta.
+    if (marker == markerLimit) return NO_MARKER; // We hit the limit - the list was exhausted.
+    return marker;
+  }
+
+  private long getArrayIndex(long marker, long delta) {
+    marker += delta;
+    if (marker >= pool.length) {
+      marker -= pool.length; // Wrap around at the end of buffer.
+    }
+    return marker;
+  }
+
+  static String toString(long markers) {
+    return OBJECTS.toString(markers) + ", " + EMPTY.toString(markers);
+  }
+
+  // TODO: Temporary for debugging. Doesn't interfere with MTT failures (unlike LOG.debug).
+  private final static class CasLog {
+    private final int size;
+    private final long[] log;
+    private final AtomicLong offset = new AtomicLong(-1);
+
+    public CasLog() {
+      size = 1 << 14 /* 256Kb in longs */;
+      log = new long[size];
+    }
+
+    public void log(long oldVal, long newVal) {
+      int ix = (int)((offset.incrementAndGet() << 1) & (size - 1));
+      log[ix] = oldVal;
+      log[ix + 1] = newVal;
+    }
+
+    public synchronized void dumpLog(boolean doSleep) {
+      if (doSleep) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+      }
+      int logSize = (int)offset.get();
+      // TODO: dump the end if wrapping around?
+      for (int i = 0; i < logSize; ++i) {
+        LOG.info("CAS history dump: " + FixedSizedObjectPool.toString(log[i << 1]) + " => "
+            + FixedSizedObjectPool.toString(log[(i << 1) + 1]));
+      }
+      offset.set(0);
+    }
+  }
+  private final CasLog casLog;
+}

Added: hive/branches/llap/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java?rev=1672782&view=auto
==============================================================================
--- hive/branches/llap/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java (added)
+++ hive/branches/llap/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java Fri Apr 10 22:33:43 2015
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.junit.Test;
+
+public class TestFixedSizedObjectPool {
+
+  private static abstract class PoolRunnable implements Runnable {
+    protected final FixedSizedObjectPool<Object> pool;
+    private final CountDownLatch cdlIn;
+    private final CountDownLatch cdlOut;
+    public final List<Object> objects = new ArrayList<>();
+    private final int count;
+
+    PoolRunnable(FixedSizedObjectPool<Object> pool,
+        CountDownLatch cdlIn, CountDownLatch cdlOut, int count) {
+      this.pool = pool;
+      this.cdlIn = cdlIn;
+      this.cdlOut = cdlOut;
+      this.count = count;
+    }
+
+    public void run() {
+      syncThreadStart(cdlIn, cdlOut);
+      for (int i = 0; i < count; ++i) {
+        doOneOp();
+      }
+    }
+
+    protected abstract void doOneOp();
+  }
+
+  private static final class OfferRunnable extends PoolRunnable {
+    OfferRunnable(FixedSizedObjectPool<Object> pool,
+        CountDownLatch cdlIn, CountDownLatch cdlOut, int count) {
+      super(pool, cdlIn, cdlOut, count);
+    }
+
+    protected void doOneOp() {
+      Object o = new Object();
+      if (pool.offer(o)) {
+        objects.add(o);
+      }
+    }
+  }
+
+  private static final class TakeRunnable extends PoolRunnable {
+    TakeRunnable(FixedSizedObjectPool<Object> pool,
+        CountDownLatch cdlIn, CountDownLatch cdlOut, int count) {
+      super(pool, cdlIn, cdlOut, count);
+    }
+
+    protected void doOneOp() {
+      Object o = pool.take();
+      if (o != OneObjHelper.THE_OBJECT) {
+        objects.add(o);
+      }
+    }
+  }
+
+  private static class DummyHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+    @Override
+    public Object create() {
+      return new Object();
+    }
+  }
+
+  private static class OneObjHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+    public static final Object THE_OBJECT = new Object();
+    @Override
+    public Object create() {
+      return THE_OBJECT;
+    }
+  }
+
+  @Test
+  public void testFullEmpty() {
+    final int SIZE = 8;
+    HashSet<Object> offered = new HashSet<>();
+    FixedSizedObjectPool<Object> pool = new FixedSizedObjectPool<>(SIZE, new DummyHelper(), true);
+    Object newObj = pool.take();
+    for (int i = 0; i < SIZE; ++i) {
+      Object obj = new Object();
+      offered.add(obj);
+      assertTrue(pool.offer(obj));
+    }
+    assertFalse(pool.offer(newObj));
+    for (int i = 0; i < SIZE; ++i) {
+      Object obj = pool.take();
+      assertTrue(offered.remove(obj));
+    }
+    assertTrue(offered.isEmpty());
+    Object newObj2 = pool.take();
+    assertNotSame(newObj, newObj2);
+  }
+
+  public static final Log LOG = LogFactory.getLog(TestFixedSizedObjectPool.class);
+
+  @Test
+  public void testMTT1() {
+    testMTTImpl(1, 3, 3);
+  }
+
+  @Test
+  public void testMTT8() {
+    testMTTImpl(8, 3, 3);
+  }
+
+  @Test
+  public void testMTT4096() {
+    testMTTImpl(4096, 3, 3);
+  }
+
+  @Test
+  public void testMTT4096_1() {
+    testMTTImpl(4096, 1, 1);
+  }
+
+  @Test
+  public void testMTT20000() {
+    testMTTImpl(20000, 3, 3);
+  }
+
+  @Test
+  public void testMTT4096_10() {
+    testMTTImpl(4096, 10, 10);
+  }
+
+  public void testMTTImpl(int size, int takerCount, int giverCount) {
+    final int TASK_COUNT = takerCount + giverCount, GIVECOUNT = 15000, TAKECOUNT = 15000;
+    ExecutorService executor = Executors.newFixedThreadPool(TASK_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(TASK_COUNT), cdlOut = new CountDownLatch(1);
+    final FixedSizedObjectPool<Object> pool =
+        new FixedSizedObjectPool<>(size, new OneObjHelper(), true);
+    // Pre-fill the pool halfway.
+    HashSet<Object> allGiven = new HashSet<>();
+    for (int i = 0; i < (size >> 1); ++i) {
+      Object o = new Object();
+      allGiven.add(o);
+      assertTrue(pool.offer(o));
+    }
+    @SuppressWarnings("unchecked")
+    FutureTask<Object>[] tasks = new FutureTask[TASK_COUNT];
+    TakeRunnable[] takers = new TakeRunnable[takerCount];
+    OfferRunnable[] givers = new OfferRunnable[giverCount];
+    int ti = 0;
+    for (int i = 0; i < takerCount; ++i, ++ti) {
+      takers[i] = new TakeRunnable(pool, cdlIn, cdlOut, TAKECOUNT);
+      tasks[ti] = new FutureTask<Object>(takers[i], null);
+      executor.execute(tasks[ti]);
+    }
+    for (int i = 0; i < giverCount; ++i, ++ti) {
+      givers[i] = new OfferRunnable(pool, cdlIn, cdlOut, GIVECOUNT);
+      tasks[ti] = new FutureTask<Object>(givers[i], null);
+      executor.execute(tasks[ti]);
+    }
+    long time = 0;
+    try {
+      cdlIn.await(); // Wait for all threads to be ready.
+      time = System.nanoTime();
+      cdlOut.countDown(); // Release them at the same time.
+      for (int i = 0; i < TASK_COUNT; ++i) {
+        tasks[i].get();
+      }
+      time = (System.nanoTime() - time);
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+    int given = allGiven.size(), takenOld = 0;
+    for (OfferRunnable g : givers) {
+      for (Object o : g.objects) {
+        assertTrue(allGiven.add(o));
+        ++given;
+      }
+    }
+    for (TakeRunnable t : takers) {
+      for (Object o : t.objects) {
+        assertTrue(allGiven.remove(o));
+        ++takenOld;
+      }
+    }
+    LOG.info("MTT test - size " + size + ", takers/givers "
+        + takerCount + "/" + giverCount + "; offered " + (given - (size >> 1)) + " (attempted "
+        + (GIVECOUNT * giverCount) + "); reused " + takenOld + ", allocated "
+        + ((TAKECOUNT * takerCount) - takenOld) + " (took " + time/1000000L
+        + "ms including thread sync)");
+    // Most of the above will be failed offers and takes (due to speed of the thing).
+    // Verify that we can drain the pool, then cycle it, i.e. the state is not corrupted.
+    while (pool.take() != OneObjHelper.THE_OBJECT);
+    for (int i = 0; i < size; ++i) {
+      assertTrue(pool.offer(new Object()));
+    }
+    assertFalse(pool.offer(new Object()));
+    for (int i = 0; i < size; ++i) {
+      assertTrue(OneObjHelper.THE_OBJECT != pool.take());
+    }
+    assertTrue(OneObjHelper.THE_OBJECT == pool.take());
+  }
+
+  private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java Fri Apr 10 22:33:43 2015
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.llap.io.a
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 
 public class EncodedColumnBatch<BatchKey> {
@@ -37,13 +36,19 @@ public class EncodedColumnBatch<BatchKey
     // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself.
     public AtomicInteger refCount = new AtomicInteger(0);
 
-    public StreamBuffer(int kind) {
-      this.streamKind = kind;
+    public void init(int kind) {
+      streamKind = kind;
+    }
+
+    public void reset() {
+      cacheBuffers.clear();
+      refCount.set(0);
     }
 
     public void incRef() {
       refCount.incrementAndGet();
     }
+
     public int decRef() {
       int i = refCount.decrementAndGet();
       assert i >= 0;
@@ -54,19 +59,21 @@ public class EncodedColumnBatch<BatchKey
   public BatchKey batchKey;
   public StreamBuffer[][] columnData;
   public int[] columnIxs;
-  public int colsRemaining = 0;
-
-  public EncodedColumnBatch(BatchKey batchKey, int columnCount, int colsRemaining) {
-    this.batchKey = batchKey;
-    this.columnData = new StreamBuffer[columnCount][];
-    this.columnIxs = new int[columnCount];
-    this.colsRemaining = colsRemaining;
-  }
-
-  public void merge(EncodedColumnBatch<BatchKey> other) {
-    // TODO: this may be called when high-level cache produces several columns and IO produces
-    //       several columns. So, for now this will never be called. Need to merge by columnIx-s.
-    throw new UnsupportedOperationException();
+  /** Generation version necessary to sync pooling reuse with the fact that two separate threads
+   * operate on batches - the one that decodes them, and potential separate thread w/a "stop" call
+   * that cleans them up. We don't want the decode thread to use the ECB that was thrown out and
+   * reused, so it remembers the version and checks it after making sure no cleanup thread can ever
+   * get to this ECB anymore. All this sync is ONLY needed because of high level cache code (sync
+   * in decode thread is for the map that combines columns coming from cache and from file), so
+   * if we throw this presently-unused code out, we'd be able to get rid of this. */
+  public int version = Integer.MIN_VALUE;
+
+  public void reset() {
+    if (columnData != null) {
+      for (int i = 0; i < columnData.length; ++i) {
+        columnData[i] = null;
+      }
+    }
   }
 
   public void initColumn(int colIxMod, int colIx, int streamCount) {

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java Fri Apr 10 22:33:43 2015
@@ -23,6 +23,10 @@ public class OrcBatchKey {
   public int stripeIx, rgIx;
 
   public OrcBatchKey(long file, int stripeIx, int rgIx) {
+    set(file, stripeIx, rgIx);
+  }
+
+  public void set(long file, int stripeIx, int rgIx) {
     this.file = file;
     this.stripeIx = stripeIx;
     this.rgIx = rgIx;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java Fri Apr 10 22:33:43 2015
@@ -37,4 +37,10 @@ public class ColumnVectorBatch {
     this.cols = new ColumnVector[columnCount];
     this.size = batchSize;
   }
+
+  public void swapColumnVector(int ix, ColumnVector[] other, int otherIx) {
+    ColumnVector old = other[otherIx];
+    other[otherIx] = cols[ix];
+    cols[ix] = old;
+  }
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Fri Apr 10 22:33:43 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.llap.count
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -102,6 +103,7 @@ public class LlapInputFormat
     private final VectorizedRowBatchCtx rbCtx;
 
     private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+    private ColumnVectorBatch lastCvb = null;
     private boolean isFirst = true;
 
     private Throwable pendingError = null;
@@ -155,8 +157,8 @@ public class LlapInputFormat
       }
       // VRB was created from VrbCtx, so we already have pre-allocated column vectors
       for (int i = 0; i < cvb.cols.length; ++i) {
-        int columnId = columnIds.get(i);
-        value.cols[columnId] = cvb.cols[i]; // TODO: reuse CV objects that are replaced
+        // Return old CVs (if any) to caller. We assume these things all have the same schema.
+        cvb.swapColumnVector(i, value.cols, columnIds.get(i));
       }
       value.selectedInUse = false;
       value.size = cvb.size;
@@ -189,8 +191,9 @@ public class LlapInputFormat
     }
 
     ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
-      // TODO: if some collection is needed, return previous ColumnVectorBatch here
-      ColumnVectorBatch current = null;
+      if (lastCvb != null) {
+        feedback.returnData(lastCvb);
+      }
       synchronized (pendingData) {
         // We are waiting for next block. Either we will get it, or be told we are done.
         boolean doLogBlocking = DebugUtils.isTraceMttEnabled() && isNothingToReport();
@@ -204,12 +207,12 @@ public class LlapInputFormat
           LlapIoImpl.LOG.info("next is unblocked");
         }
         rethrowErrorIfAny();
-        current = pendingData.poll();
+        lastCvb = pendingData.poll();
       }
-      if (DebugUtils.isTraceMttEnabled() && current != null) {
-        LlapIoImpl.LOG.info("Processing will receive vector " + current);
+      if (DebugUtils.isTraceMttEnabled() && lastCvb != null) {
+        LlapIoImpl.LOG.info("Processing will receive vector " + lastCvb);
       }
-      return current;
+      return lastCvb;
     }
 
     private boolean isNothingToReport() {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java Fri Apr 10 22:33:43 2015
@@ -25,32 +25,42 @@ import java.util.concurrent.Callable;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
 
 /**
  *
  */
-public abstract class EncodedDataConsumer<BatchKey> implements
-  Consumer<EncodedColumnBatch<BatchKey>>, ReadPipeline {
+public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
+  implements Consumer<BatchType>, ReadPipeline {
   private volatile boolean isStopped = false;
   // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
-  private final HashMap<BatchKey, EncodedColumnBatch<BatchKey>> pendingData = new HashMap<>();
-  private ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback;
+  private final HashMap<BatchKey, BatchType> pendingData = new HashMap<>();
+  private ConsumerFeedback<BatchType> upstreamFeedback;
   private final Consumer<ColumnVectorBatch> downstreamConsumer;
   private Callable<Void> readCallable;
-  private final int colCount;
   private final LlapDaemonQueueMetrics queueMetrics;
+  // TODO: if we were using Exchanger, pool would not be necessary here - it would be 1/N items
+  private final static int CVB_POOL_SIZE = 8;
+  // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema.
+  protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
 
-  public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, int colCount,
+  public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount,
       LlapDaemonQueueMetrics queueMetrics) {
     this.downstreamConsumer = consumer;
-    this.colCount = colCount;
     this.queueMetrics = queueMetrics;
+    cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
+        new PoolObjectHelper<ColumnVectorBatch>() {
+              @Override
+              public ColumnVectorBatch create() {
+                return new ColumnVectorBatch(colCount);
+              }
+        });
   }
 
-  public void init(ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback,
+  public void init(ConsumerFeedback<BatchType> upstreamFeedback,
       Callable<Void> readCallable) {
     this.upstreamFeedback = upstreamFeedback;
     this.readCallable = readCallable;
@@ -62,10 +72,11 @@ public abstract class EncodedDataConsume
   }
 
   @Override
-  public void consumeData(EncodedColumnBatch<BatchKey> data) {
+  public void consumeData(BatchType data) {
     // TODO: data arrives in whole batches now, not in columns. We could greatly simplify this.
-    EncodedColumnBatch<BatchKey> targetBatch = null;
+    BatchType targetBatch = null;
     boolean localIsStopped = false;
+    Integer targetBatchVersion = null;
     synchronized (pendingData) {
       localIsStopped = isStopped;
       if (!localIsStopped) {
@@ -74,56 +85,52 @@ public abstract class EncodedDataConsume
           targetBatch = data;
           pendingData.put(data.batchKey, data);
         }
+        // We have the map locked; the code the throws things away from map only bumps the version
+        // under the same map lock; code the throws things away here only bumps the version when
+        // the batch was taken out of the map.
+        targetBatchVersion = targetBatch.version;
       }
       queueMetrics.setQueueSize(pendingData.size());
     }
     if (localIsStopped) {
-      returnProcessed(data.columnData);
+      returnSourceData(data);
       return;
     }
-
+    assert targetBatchVersion != null;
     synchronized (targetBatch) {
-      // Check if we are stopped and the batch was already cleaned.
-      localIsStopped = (targetBatch.columnData == null);
-      if (!localIsStopped) {
-        if (targetBatch != data) {
-          targetBatch.merge(data);
-        }
-        if (0 == targetBatch.colsRemaining) {
-          synchronized (pendingData) {
-            targetBatch = isStopped ? null : pendingData.remove(data.batchKey);
-          }
-          // Check if we are stopped and the batch had been removed from map.
-          localIsStopped = (targetBatch == null);
-          // We took the batch out of the map. No more contention with stop possible.
-        }
+      if (targetBatch != data) {
+        throw new UnsupportedOperationException("Merging is not supported");
       }
+      synchronized (pendingData) {
+        targetBatch = isStopped ? null : pendingData.remove(data.batchKey);
+        // Check if someone already threw this away and changed the version.
+        localIsStopped = (targetBatchVersion != targetBatch.version);
+      }
+      // We took the batch out of the map. No more contention with stop possible.
     }
-    if (localIsStopped) {
-      returnProcessed(data.columnData);
+    if (localIsStopped && (targetBatch != data)) {
+      returnSourceData(data);
       return;
     }
-    if (0 == targetBatch.colsRemaining) {
-      long start = System.currentTimeMillis();
-      decodeBatch(targetBatch, downstreamConsumer);
-      long end = System.currentTimeMillis();
-      queueMetrics.addProcessingTime(end - start);
-      // Batch has been decoded; unlock the buffers in cache
-      returnProcessed(targetBatch.columnData);
-    }
+    long start = System.currentTimeMillis();
+    decodeBatch(targetBatch, downstreamConsumer);
+    long end = System.currentTimeMillis();
+    queueMetrics.addProcessingTime(end - start);
+    returnSourceData(targetBatch);
+  }
+
+  /**
+   * Returns the ECB to caller for reuse. Only safe to call if the thread is the only owner
+   * of the ECB in question; or, if ECB is still in pendingData, pendingData must be locked.
+   */
+  private void returnSourceData(BatchType data) {
+    ++data.version;
+    upstreamFeedback.returnData(data);
   }
 
-  protected abstract void decodeBatch(EncodedColumnBatch<BatchKey> batch,
+  protected abstract void decodeBatch(BatchType batch,
       Consumer<ColumnVectorBatch> downstreamConsumer);
 
-  protected void returnProcessed(EncodedColumnBatch.StreamBuffer[][] data) {
-    for (EncodedColumnBatch.StreamBuffer[] sbs : data) {
-      for (EncodedColumnBatch.StreamBuffer sb : sbs) {
-        upstreamFeedback.returnData(sb);
-      }
-    }
-  }
-
   @Override
   public void setDone() {
     synchronized (pendingData) {
@@ -143,33 +150,24 @@ public abstract class EncodedDataConsume
 
   @Override
   public void returnData(ColumnVectorBatch data) {
-    // TODO: column vectors could be added to object pool here
+    cvbPool.offer(data);
   }
 
   private void dicardPendingData(boolean isStopped) {
-    List<EncodedColumnBatch<BatchKey>> batches = new ArrayList<EncodedColumnBatch<BatchKey>>(
+    List<BatchType> batches = new ArrayList<BatchType>(
         pendingData.size());
     synchronized (pendingData) {
       if (isStopped) {
         this.isStopped = true;
       }
-      batches.addAll(pendingData.values());
-      pendingData.clear();
-    }
-    List<EncodedColumnBatch.StreamBuffer> dataToDiscard = new ArrayList<StreamBuffer>(
-        batches.size() * colCount * 2);
-    for (EncodedColumnBatch<BatchKey> batch : batches) {
-      synchronized (batch) {
-        for (EncodedColumnBatch.StreamBuffer[] bb : batch.columnData) {
-          for (EncodedColumnBatch.StreamBuffer b : bb) {
-            dataToDiscard.add(b);
-          }
-        }
-        batch.columnData = null;
+      for (BatchType ecb : pendingData.values()) {
+        ++ecb.version;
+        batches.add(ecb);
       }
+      pendingData.clear();
     }
-    for (EncodedColumnBatch.StreamBuffer data : dataToDiscard) {
-      upstreamFeedback.returnData(data);
+    for (BatchType batch : batches) {
+      upstreamFeedback.returnData(batch);
     }
   }
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java Fri Apr 10 22:33:43 2015
@@ -30,12 +30,14 @@ import org.apache.hadoop.hive.llap.metri
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.EncodedTreeReaderFactory;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
 
-public class OrcEncodedDataConsumer extends EncodedDataConsumer<OrcBatchKey> {
+public class OrcEncodedDataConsumer
+  extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> {
   private EncodedTreeReaderFactory.TreeReader[] columnReaders;
   private int previousStripeIndex = -1;
   private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
@@ -66,7 +68,7 @@ public class OrcEncodedDataConsumer exte
   }
 
   @Override
-  protected void decodeBatch(EncodedColumnBatch<OrcBatchKey> batch,
+  protected void decodeBatch(OrcEncodedColumnBatch batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
     int currentStripeIndex = batch.batchKey.stripeIx;
 
@@ -97,11 +99,12 @@ public class OrcEncodedDataConsumer exte
           if (batchSize == 0) break;
         }
 
-        ColumnVectorBatch cvb = new ColumnVectorBatch(batch.columnIxs.length);
+        ColumnVectorBatch cvb = cvbPool.take();
+        assert cvb.cols.length == batch.columnIxs.length; // Must be constant per split.
         cvb.size = batchSize;
 
         for (int idx = 0; idx < batch.columnIxs.length; idx++) {
-          cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(null, batchSize);
+          cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
         }
 
         // we are done reading a batch, send it to consumer for processing

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java Fri Apr 10 22:33:43 2015
@@ -18,7 +18,6 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -30,6 +29,8 @@ import org.apache.hadoop.hive.llap.io.me
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.ql.io.orc.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
@@ -45,8 +46,14 @@ import org.apache.hadoop.hive.ql.io.sarg
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 
+/**
+ * This produces EncodedColumnBatch via ORC EncodedDataImpl.
+ * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where
+ * it inserts itself into the pipeline to put the data in cache, before passing it to the real
+ * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es.
+ */
 public class OrcEncodedDataReader extends CallableWithNdc<Void>
-    implements ConsumerFeedback<StreamBuffer>, Consumer<EncodedColumnBatch<OrcBatchKey>> {
+    implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
   private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class);
 
   private final OrcMetadataCache metadataCache;
@@ -73,7 +80,9 @@ public class OrcEncodedDataReader extend
    * read. Contains only stripes that are read, and only columns included. null => read all RGs.
    */
   private boolean[][][] readState;
-  private volatile boolean isStopped = false, isPaused = false;
+  private volatile boolean isStopped = false;
+  @SuppressWarnings("unused")
+  private volatile boolean isPaused = false;
 
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
       OrcMetadataCache metadataCache, Configuration conf, InputSplit split,
@@ -204,7 +213,7 @@ public class OrcEncodedDataReader extend
     // 5. Create encoded data reader.
     // In case if we have high-level cache, we will intercept the data and add it there;
     // otherwise just pass the data directly to the consumer.
-    Consumer<EncodedColumnBatch<OrcBatchKey>> dataConsumer =
+    Consumer<OrcEncodedColumnBatch> dataConsumer =
         (cache == null) ? this.consumer : this;
     try {
       ensureOrcReader();
@@ -447,14 +456,21 @@ public class OrcEncodedDataReader extend
   }
 
   @Override
-  public void returnData(StreamBuffer data) {
-    if (data.decRef() != 0) return;
-    if (DebugUtils.isTraceLockingEnabled()) {
-      for (LlapMemoryBuffer buf : data.cacheBuffers) {
-        LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
+  public void returnData(OrcEncodedColumnBatch ecb) {
+    for (StreamBuffer[] datas : ecb.columnData) {
+      for (StreamBuffer data : datas) {
+        if (data.decRef() != 0) continue;
+        if (DebugUtils.isTraceLockingEnabled()) {
+          for (LlapMemoryBuffer buf : data.cacheBuffers) {
+            LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
+          }
+        }
+        lowLevelCache.releaseBuffers(data.cacheBuffers);
+        EncodedReaderImpl.SB_POOL.offer(data);
       }
     }
-    lowLevelCache.releaseBuffers(data.cacheBuffers);
+    // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+    EncodedReaderImpl.ECB_POOL.offer(ecb);
   }
 
   /**
@@ -576,8 +592,8 @@ public class OrcEncodedDataReader extend
       boolean[] isMissingAnyRgs = new boolean[cols.length];
       int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
       for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
-        EncodedColumnBatch<OrcBatchKey> col = new EncodedColumnBatch<OrcBatchKey>(
-            new OrcBatchKey(fileId, key.stripeIx, rgIx), cols.length, cols.length);
+        OrcEncodedColumnBatch col = EncodedReaderImpl.ECB_POOL.take();
+        col.init(fileId, key.stripeIx, rgIx, cols.length);
         boolean hasAnyCached = false;
         try {
           key.rgIx = rgIx;
@@ -633,7 +649,7 @@ public class OrcEncodedDataReader extend
   }
 
   @Override
-  public void consumeData(EncodedColumnBatch<OrcBatchKey> data) {
+  public void consumeData(OrcEncodedColumnBatch data) {
     // Store object in cache; create new key object - cannot be reused.
     assert cache != null;
     for (int i = 0; i < data.columnData.length; ++i) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Fri Apr 10 22:33:43 2015
@@ -69,7 +69,7 @@ public class OrcStripeMetadata extends L
     estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
   }
 
-  public OrcStripeMetadata(long id) {
+  private OrcStripeMetadata(long id) {
     stripeKey = new OrcBatchKey(id, 0, 0);
     encodings = new ArrayList<>();
     streams = new ArrayList<>();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java Fri Apr 10 22:33:43 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +37,6 @@ import org.apache.hadoop.hive.llap.io.ap
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
-import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunkFactory;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
@@ -44,6 +44,8 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
 
 
 /**
@@ -76,7 +78,50 @@ import org.apache.hadoop.hive.shims.Hado
  */
 public class EncodedReaderImpl implements EncodedReader {
   public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
-
+  private static final FixedSizedObjectPool<ColumnReadContext> COLCTX_POOL =
+      new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<ColumnReadContext>() {
+        @Override
+        public ColumnReadContext create() {
+          return new ColumnReadContext();
+        }
+        @Override
+        public void resetBeforeOffer(ColumnReadContext t) {
+          t.reset();
+        }
+      });
+  private static final FixedSizedObjectPool<StreamContext> STREAMCTX_POOL =
+      new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<StreamContext>() {
+        @Override
+        public StreamContext create() {
+          return new StreamContext();
+        }
+        @Override
+        public void resetBeforeOffer(StreamContext t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+        @Override
+        protected OrcEncodedColumnBatch create() {
+          return new OrcEncodedColumnBatch();
+        }
+        @Override
+        protected void resetBeforeOffer(OrcEncodedColumnBatch t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<StreamBuffer> SB_POOL =
+      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<StreamBuffer>() {
+        @Override
+        protected StreamBuffer create() {
+          return new StreamBuffer();
+        }
+        @Override
+        protected void resetBeforeOffer(StreamBuffer t) {
+          t.reset();
+        }
+      });
   private final long fileId;
   private final FSDataInputStream file;
   private final CompressionCodec codec;
@@ -87,13 +132,12 @@ public class EncodedReaderImpl implement
   private final LowLevelCache cache;
   private final ByteBufferAllocatorPool pool;
   // For now, one consumer for all calls.
-  private final Consumer<EncodedColumnBatch<OrcBatchKey>> consumer;
-  // TODO: if used as a pool, pass in externally
-  private final TrackedCacheChunkFactory cacheChunkFactory = new TrackedCacheChunkFactory();
+  private final Consumer<OrcEncodedColumnBatch> consumer;
+
 
   public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy,
       List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
-      LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer)
+      LowLevelCache cache, Consumer<OrcEncodedColumnBatch> consumer)
           throws IOException {
     this.fileId = fileId;
     this.file = fileSystem.open(path);
@@ -111,27 +155,34 @@ public class EncodedReaderImpl implement
     }
   }
 
-
   /** Helper context for each column being read */
   private static final class ColumnReadContext {
-    public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
+    public void init(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
       this.encoding = encoding;
       this.rowIndex = rowIndex;
       this.colIx = colIx;
+      streamCount = 0;
+    }
+    public void reset() {
+      encoding = null;
+      rowIndex = null;
+      streamCount = 0;
+      Arrays.fill(streams, null);
     }
     public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
     /** The number of streams that are part of this column. */
     int streamCount = 0;
     final StreamContext[] streams = new StreamContext[MAX_STREAMS];
     /** Column encoding. */
-    final ColumnEncoding encoding;
+    ColumnEncoding encoding;
     /** Column rowindex. */
-    final OrcProto.RowIndex rowIndex;
+    OrcProto.RowIndex rowIndex;
     /** Column index in the file. */
-    final int colIx;
+    int colIx;
 
     public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
-      streams[streamCount++] = new StreamContext(stream, offset, indexIx);
+      StreamContext sctx = streams[streamCount++] = STREAMCTX_POOL.take();
+      sctx.init(stream, offset, indexIx);
     }
 
     @Override
@@ -152,16 +203,21 @@ public class EncodedReaderImpl implement
   }
 
   private static final class StreamContext {
-    public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
+    public void init(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
       this.kind = stream.getKind();
       this.length = stream.getLength();
       this.offset = streamOffset;
       this.streamIndexOffset = streamIndexOffset;
     }
+    void reset() {
+      bufferIter = null;
+      stripeLevelStream = null;
+      kind = null;
+    }
     /** Offsets of each stream in the column. */
-    public final long offset, length;
-    public final int streamIndexOffset;
-    public final OrcProto.Stream.Kind kind;
+    public long offset, length;
+    public int streamIndexOffset;
+    public OrcProto.Stream.Kind kind;
     /** Iterators for the buffers; used to maintain position in per-rg reading. */
     DiskRangeList bufferIter;
     /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
@@ -178,6 +234,20 @@ public class EncodedReaderImpl implement
     }
   }
 
+  public static final class OrcEncodedColumnBatch extends EncodedColumnBatch<OrcBatchKey> {
+    public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
+      if (batchKey == null) {
+        batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);
+      } else {
+        batchKey.set(fileId, stripeIx, rgIx);
+      }
+      if (columnIxs == null || columnCount != columnIxs.length) {
+        columnIxs = new int[columnCount];
+        columnData = new StreamBuffer[columnCount][];
+      }
+    }
+  }
+
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
       RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
@@ -200,7 +270,7 @@ public class EncodedReaderImpl implement
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
     boolean isCompressed = (codec != null);
-
+    DiskRangeListMutateHelper toRead = null;
     DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
@@ -219,8 +289,8 @@ public class EncodedReaderImpl implement
         assert colCtxs[colRgIx] == null;
         lastColIx = colIx;
         includedRgs = colRgs[colRgIx];
-        ctx = colCtxs[colRgIx] = new ColumnReadContext(
-            colIx, encodings.get(colIx), indexes[colIx]);
+        ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
+        ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
         if (DebugUtils.isTraceOrcEnabled()) {
           LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
         }
@@ -250,17 +320,18 @@ public class EncodedReaderImpl implement
 
     if (listToRead.get() == null) {
       LOG.warn("Nothing to read for stripe [" + stripe + "]");
+      releaseContexts(colCtxs);
       return;
     }
 
     // 2. Now, read all of the ranges from cache or disk.
-    DiskRangeListMutateHelper toRead = new DiskRangeListMutateHelper(listToRead.get());
+    toRead = new DiskRangeListMutateHelper(listToRead.get());
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Resulting disk ranges to read (file " + fileId + "): "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
     if (cache != null) {
-      cache.getFileData(fileId, toRead.next, stripeOffset, cacheChunkFactory);
+      cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY);
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
             + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -282,8 +353,8 @@ public class EncodedReaderImpl implement
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
       boolean isLastRg = rgIx == rgCount - 1;
       // Create the batch we will use to return data for this RG.
-      EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
-          new OrcBatchKey(fileId, stripeIx, rgIx), colRgs.length, 0);
+      OrcEncodedColumnBatch ecb = ECB_POOL.take();
+      ecb.init(fileId, stripeIx, rgIx, colRgs.length);
       boolean isRGSelected = true;
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
         if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
@@ -305,7 +376,8 @@ public class EncodedReaderImpl implement
                   + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
             }
             if (sctx.stripeLevelStream == null) {
-              sctx.stripeLevelStream = new StreamBuffer(sctx.kind.getNumber());
+              sctx.stripeLevelStream = SB_POOL.take();
+              sctx.stripeLevelStream.init(sctx.kind.getNumber());
               // We will be using this for each RG while also sending RGs to processing.
               // To avoid buffers being unlocked, run refcount one ahead; we will not increase
               // it when building the last RG, so each RG processing will decref once, and the
@@ -333,7 +405,8 @@ public class EncodedReaderImpl implement
                     isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
             // See class comment about refcounts.
             long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
-            cb = new StreamBuffer(sctx.kind.getNumber());
+            cb = SB_POOL.take();
+            cb.init(sctx.kind.getNumber());
             cb.incRef();
             if (DebugUtils.isTraceOrcEnabled()) {
               LOG.info("Getting data for column "+ ctx.colIx + " " + (isLastRg ? "last " : "")
@@ -356,13 +429,34 @@ public class EncodedReaderImpl implement
         consumer.consumeData(ecb);
       }
     }
+    releaseContexts(colCtxs);
+
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Disk ranges after processing all the data "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
     // Release the unreleased buffers. See class comment about refcounts.
-    DiskRangeList current = toRead.next;
+    releaseInitialRefcounts(toRead.next);
+    InStream.releaseCacheChunksIntoObjectPool(toRead.next);
+  }
+
+
+  private void releaseContexts(ColumnReadContext[] colCtxs) {
+    // Return all contexts to the pools.
+    for (ColumnReadContext ctx : colCtxs) {
+      if (ctx == null) continue;
+      for (int i = 0; i < ctx.streamCount; ++i) {
+        StreamContext sctx = ctx.streams[i];
+        if (sctx == null) continue;
+        STREAMCTX_POOL.offer(sctx);
+      }
+      COLCTX_POOL.offer(ctx);
+    }
+  }
+
+
+  private void releaseInitialRefcounts(DiskRangeList current) {
     while (current != null) {
       DiskRangeList toFree = current;
       current = current.next;
@@ -378,6 +472,7 @@ public class EncodedReaderImpl implement
     }
   }
 
+
   @Override
   public void close() throws IOException {
     try {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Fri Apr 10 22:33:43 2015
@@ -29,22 +29,52 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.LogLevels;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheChunkFactory;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
 
 import com.google.common.annotations.VisibleForTesting;
 
 public abstract class InStream extends InputStream {
 
   private static final Log LOG = LogFactory.getLog(InStream.class);
-  private static final LogLevels LOGL = new LogLevels(LOG);
-
+  private static final FixedSizedObjectPool<TrackedCacheChunk> TCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<TrackedCacheChunk>() {
+        @Override
+        protected TrackedCacheChunk create() {
+          return new TrackedCacheChunk();
+        }
+        @Override
+        protected void resetBeforeOffer(TrackedCacheChunk t) {
+          t.reset();
+        }
+      });
+  private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
+        @Override
+        protected ProcCacheChunk create() {
+          return new ProcCacheChunk();
+        }
+        @Override
+        protected void resetBeforeOffer(ProcCacheChunk t) {
+          t.reset();
+        }
+      });
+  final static CacheChunkFactory CC_FACTORY = new CacheChunkFactory() {
+        @Override
+        public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+          TrackedCacheChunk tcc = TCC_POOL.take();
+          tcc.init(buffer, offset, end);
+          return tcc;
+        }
+      };
   protected final Long fileId;
   protected final String name;
   protected long length;
@@ -539,20 +569,24 @@ public abstract class InStream extends I
     }
   }
 
-  public static class TrackedCacheChunkFactory implements LowLevelCache.CacheChunkFactory {
-    // TODO: in future, this can also be used as a pool
-    @Override
-    public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
-      return new TrackedCacheChunk(buffer, offset, end);
-    }
-  }
-
   /** Cache chunk which tracks whether it has been fully read. See
       EncodedReaderImpl class comment about refcounts. */
   public static class TrackedCacheChunk extends CacheChunk {
     public boolean isReleased = false;
-    public TrackedCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
-      super(buffer, offset, end);
+    public TrackedCacheChunk() {
+      super(null, -1, -1);
+    }
+
+    public void init(LlapMemoryBuffer buffer, long offset, long end) {
+      this.buffer = buffer;
+      this.offset = offset;
+      this.end = end;
+    }
+
+    public void reset() {
+      this.buffer = null;
+      this.next = this.prev = null;
+      this.isReleased = false;
     }
   }
 
@@ -563,14 +597,20 @@ public abstract class InStream extends I
    * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
    */
   private static class ProcCacheChunk extends TrackedCacheChunk {
-    public ProcCacheChunk(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+    public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
         ByteBuffer originalData, LlapMemoryBuffer targetBuffer, int originalCbIndex) {
-      super(targetBuffer, cbStartOffset, cbEndOffset);
+      super.init(targetBuffer, cbStartOffset, cbEndOffset);
       this.isCompressed = isCompressed;
       this.originalData = originalData;
       this.originalCbIndex = originalCbIndex;
     }
 
+    @Override
+    public void reset() {
+      super.reset();
+      this.originalData = null;
+    }
+
     boolean isCompressed;
     ByteBuffer originalData = null;
     int originalCbIndex;
@@ -595,12 +635,16 @@ public abstract class InStream extends I
    * @return Last buffer cached during decomrpession. Cache buffers are never removed from
    *         the master list, so they are safe to keep as iterators for various streams.
    */
-  // TODO#: move to EncodedReaderImpl
+  // TODO: move to EncodedReaderImpl
   public static DiskRangeList uncompressStream(long fileId, long baseOffset, DiskRangeList start,
       long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec codec,
       int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long unlockUntilCOffset)
           throws IOException {
-    streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
+    if (streamBuffer.cacheBuffers == null) {
+      streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
+    } else {
+      streamBuffer.cacheBuffers.clear();
+    }
     if (cOffset == endCOffset) return null;
     List<ProcCacheChunk> toDecompress = null;
     List<ByteBuffer> toRelease = null;
@@ -736,6 +780,17 @@ public abstract class InStream extends I
     return lastCached;
   }
 
+  public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
+    while (current != null) {
+      if (current instanceof ProcCacheChunk) {
+        PCC_POOL.offer((ProcCacheChunk)current);
+      } else if (current instanceof TrackedCacheChunk) {
+        TCC_POOL.offer((TrackedCacheChunk)current);
+      }
+      current = current.next;
+    }
+  }
+
   private static void ponderReleaseInitialRefcount(LowLevelCache cache,
       long unlockUntilCOffset, TrackedCacheChunk cc) {
     if (cc.getEnd() > unlockUntilCOffset) return;
@@ -950,7 +1005,8 @@ public abstract class InStream extends I
     // Add it to result in order we are processing.
     cacheBuffers.add(futureAlloc);
     // Add it to the list of work to decompress.
-    ProcCacheChunk cc = new ProcCacheChunk(cbStartOffset, cbEndOffset, !isUncompressed,
+    ProcCacheChunk cc = PCC_POOL.take();
+    cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
         fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
     toDecompress.add(cc);
     // Adjust the compression block position.

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Fri Apr 10 22:33:43 2015
@@ -147,7 +147,6 @@ public class OrcSplit extends FileSplit
     if (hasFileId) {
       fileId = in.readLong();
     }
-    LOG.error("TODO# Got file ID " + fileId + " for " + getPath());
   }
 
   FileMetaInfo getFileMetaInfo(){

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Fri Apr 10 22:33:43 2015
@@ -23,9 +23,9 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
@@ -318,7 +318,7 @@ public interface Reader {
   MetadataReader metadata() throws IOException;
 
   EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
-      Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException;
+      Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
 
   List<Integer> getVersionList();
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1672782&r1=1672781&r2=1672782&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Fri Apr 10 22:33:43 2015
@@ -38,9 +38,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
@@ -716,7 +716,7 @@ public class ReaderImpl implements Reade
 
   @Override
   public EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
-      Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException {
+      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     boolean useZeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
     return new EncodedReaderImpl(fileSystem, path, fileId, useZeroCopy, types,
         codec, bufferSize, rowIndexStride, lowLevelCache, consumer);



Mime
View raw message