drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From julienledem <...@git.apache.org>
Subject [GitHub] drill pull request: DRILL-4134: Allocator updates
Date Tue, 01 Dec 2015 17:57:00 GMT
Github user julienledem commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46312998
  
    --- Diff: exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
---
    @@ -23,193 +23,246 @@
     import java.nio.ByteBuffer;
     import java.util.concurrent.atomic.AtomicLong;
     
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +
     import com.codahale.metrics.Gauge;
     import com.codahale.metrics.Histogram;
     import com.codahale.metrics.Metric;
     import com.codahale.metrics.MetricFilter;
     import com.codahale.metrics.MetricRegistry;
     
    -public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
    -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
    -
    +public class PooledByteBufAllocatorL {
       private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
    +
       private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
     
    +
       private static final String METRIC_PREFIX = "drill.allocator.";
    +
       private final MetricRegistry registry;
       private final AtomicLong hugeBufferSize = new AtomicLong(0);
       private final AtomicLong hugeBufferCount = new AtomicLong(0);
       private final AtomicLong normalBufferSize = new AtomicLong(0);
       private final AtomicLong normalBufferCount = new AtomicLong(0);
     
    -  private final PoolArena<ByteBuffer>[] directArenas;
    -  private final MemoryStatusThread statusThread;
    -  private final Histogram largeBuffersHist;
    -  private final Histogram normalBuffersHist;
    +  public final InnerAllocator allocator;
    +  public final UnsafeDirectLittleEndian empty;
     
       public PooledByteBufAllocatorL(MetricRegistry registry) {
    -    super(true);
         this.registry = registry;
    +    allocator = new InnerAllocator();
    +    empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
    +  }
    +
    +  public UnsafeDirectLittleEndian allocate(int size) {
         try {
    -      Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
    -      f.setAccessible(true);
    -      this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
    -    } catch (Exception e) {
    -      throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve
direct arenas field.", e);
    +      return allocator.directBuffer(size, size);
    +    } catch (OutOfMemoryError e) {
    +      throw new OutOfMemoryException("Failure allocating buffer.", e);
         }
     
    -    if (memoryLogger.isTraceEnabled()) {
    -      statusThread = new MemoryStatusThread();
    -      statusThread.start();
    -    } else {
    -      statusThread = null;
    -    }
    -    removeOldMetrics();
    +  }
     
    -    registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return normalBufferSize.get();
    -      }
    -    });
    +  public int getChunkSize() {
    +    return allocator.chunkSize;
    +  }
     
    -    registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return normalBufferCount.get();
    -      }
    -    });
    +  private class InnerAllocator extends PooledByteBufAllocator {
     
    -    registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return hugeBufferSize.get();
    -      }
    -    });
     
    -    registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return hugeBufferCount.get();
    -      }
    -    });
    +    private final PoolArena<ByteBuffer>[] directArenas;
    +    private final MemoryStatusThread statusThread;
    +    private final Histogram largeBuffersHist;
    +    private final Histogram normalBuffersHist;
    +    private final int chunkSize;
     
    -    largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
    -    normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
    +    public InnerAllocator() {
    +      super(true);
     
    -  }
    +      try {
    +        Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
    +        f.setAccessible(true);
    +        this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
    +      } catch (Exception e) {
    +        throw new RuntimeException("Failure while initializing allocator.  Unable to
retrieve direct arenas field.", e);
    +      }
     
    -  private synchronized void removeOldMetrics() {
    -    registry.removeMatching(new MetricFilter() {
    -      @Override
    -      public boolean matches(String name, Metric metric) {
    -        return name.startsWith("drill.allocator.");
    +      this.chunkSize = directArenas[0].chunkSize;
    +
    +      if (memoryLogger.isTraceEnabled()) {
    +        statusThread = new MemoryStatusThread();
    +        statusThread.start();
    +      } else {
    +        statusThread = null;
           }
    +      removeOldMetrics();
     
    -    });
    -  }
    +      registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return normalBufferSize.get();
    +        }
    +      });
     
    -  @Override
    -  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    -    throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
    -  }
    +      registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return normalBufferCount.get();
    +        }
    +      });
    +
    +      registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return hugeBufferSize.get();
    +        }
    +      });
     
    -  @Override
    -  protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity, int maxCapacity)
{
    -    PoolThreadCache cache = threadCache.get();
    -    PoolArena<ByteBuffer> directArena = cache.directArena;
    +      registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return hugeBufferCount.get();
    +        }
    +      });
     
    -    if (directArena != null) {
    +      largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
    +      normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
     
    -      if (initialCapacity > directArena.chunkSize) {
    -        // This is beyond chunk size so we'll allocate separately.
    -        ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity,
maxCapacity);
    +    }
     
    -        hugeBufferCount.incrementAndGet();
    -        hugeBufferSize.addAndGet(buf.capacity());
    -        largeBuffersHist.update(buf.capacity());
    -        // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
    -        return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
     
    -      } else {
    -        // within chunk, use arena.
    -        ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    -        if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
    -          fail();
    +    private synchronized void removeOldMetrics() {
    +      registry.removeMatching(new MetricFilter() {
    +        @Override
    +        public boolean matches(String name, Metric metric) {
    +          return name.startsWith("drill.allocator.");
             }
     
    -        normalBuffersHist.update(buf.capacity());
    -        if (ASSERT_ENABLED) {
    -          normalBufferSize.addAndGet(buf.capacity());
    -          normalBufferCount.incrementAndGet();
    +      });
    +    }
    +
    +    private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity)
{
    +      PoolThreadCache cache = threadCache.get();
    +      PoolArena<ByteBuffer> directArena = cache.directArena;
    +
    +      if (directArena != null) {
    +
    +        if (initialCapacity > directArena.chunkSize) {
    +          // This is beyond chunk size so we'll allocate separately.
    +          ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity,
maxCapacity);
    +
    +          hugeBufferCount.incrementAndGet();
    +          hugeBufferSize.addAndGet(buf.capacity());
    +          largeBuffersHist.update(buf.capacity());
    +          // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
    +          return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
    +
    +        } else {
    +          // within chunk, use arena.
    +          ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    +          if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
    +            fail();
    +          }
    +
    +          normalBuffersHist.update(buf.capacity());
    +          if (ASSERT_ENABLED) {
    +            normalBufferSize.addAndGet(buf.capacity());
    +            normalBufferCount.incrementAndGet();
    +          }
    +
    +          return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
    +              normalBufferSize);
             }
     
    -        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
normalBufferSize);
    +      } else {
    +        throw fail();
           }
    -
    -    } else {
    -      throw fail();
         }
    -  }
    -
    -  private UnsupportedOperationException fail() {
    -    return new UnsupportedOperationException(
    -        "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform
didn't provide that functionality.");
    -  }
     
    +    private UnsupportedOperationException fail() {
    +      return new UnsupportedOperationException(
    +          "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform
didn't provide that functionality.");
    +    }
     
    -  @Override
    -  public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity)
{
    +    public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity)
{
           if (initialCapacity == 0 && maxCapacity == 0) {
    -          newDirectBuffer(initialCapacity, maxCapacity);
    +        newDirectBuffer(initialCapacity, maxCapacity);
           }
           validate(initialCapacity, maxCapacity);
    -      return newDirectBuffer(initialCapacity, maxCapacity);
    -  }
    +      return newDirectBufferL(initialCapacity, maxCapacity);
    +    }
     
    -  @Override
    -  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    -    throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
    -  }
    +    @Override
    +    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    +      throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
    +    }
     
     
    -  private static void validate(int initialCapacity, int maxCapacity) {
    -    if (initialCapacity < 0) {
    +    private void validate(int initialCapacity, int maxCapacity) {
    +      if (initialCapacity < 0) {
             throw new IllegalArgumentException("initialCapacity: " + initialCapacity + "
(expectd: 0+)");
    -    }
    -    if (initialCapacity > maxCapacity) {
    +      }
    +      if (initialCapacity > maxCapacity) {
             throw new IllegalArgumentException(String.format(
    -                "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
    -                initialCapacity, maxCapacity));
    +            "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
    +            initialCapacity, maxCapacity));
    +      }
         }
    -  }
     
    -  private class MemoryStatusThread extends Thread {
    +    private class MemoryStatusThread extends Thread {
     
    -    public MemoryStatusThread() {
    -      super("memory-status-logger");
    -      this.setDaemon(true);
    -      this.setName("allocation.logger");
    -    }
    +      public MemoryStatusThread() {
    +        super("memory-status-logger");
    +        this.setDaemon(true);
    +        this.setName("allocation.logger");
    +      }
     
    -    @Override
    -    public void run() {
    -      while (true) {
    -        memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
    -        try {
    -          Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
    -        } catch (InterruptedException e) {
    -          return;
    -        }
    +      @Override
    +      public void run() {
    +        while (true) {
    +          memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
    +          try {
    +            Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
    +          } catch (InterruptedException e) {
    +            return;
    +          }
     
    +        }
           }
    +
         }
     
    -  }
    +    public void checkAndReset() {
    +      if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
    +        StringBuilder buf = new StringBuilder();
    +        buf.append("Large buffers outstanding: ");
    --- End diff --
    
    also: should this reuse toString?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message