Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2EB9D200BFA for ; Thu, 12 Jan 2017 18:21:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2D529160B40; Thu, 12 Jan 2017 17:21:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2ADC6160B29 for ; Thu, 12 Jan 2017 18:21:46 +0100 (CET) Received: (qmail 69698 invoked by uid 500); 12 Jan 2017 17:21:45 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 69688 invoked by uid 99); 12 Jan 2017 17:21:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jan 2017 17:21:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35B1FDFA22; Thu, 12 Jan 2017 17:21:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Message-Id: <9f9b689ee84e4ba49fd602bd4a46a522@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: arrow git commit: ARROW-385: Refactors metric system Date: Thu, 12 Jan 2017 17:21:45 +0000 (UTC) archived-at: Thu, 12 Jan 2017 17:21:47 -0000 Repository: arrow Updated Branches: refs/heads/master 7d3e2a3ab -> c5663c6d0 ARROW-385: Refactors metric system Arrow has some support for metrics, but the metrics registry is by default not configured to export values. It also forces user to user yammer/codahale metrics library instead of the library of their choice. To allow for integration with other metrics system, replace it with a notification mechanism to alert user on allocation/deallocation. Author: Laurent Goujon Closes #212 from laurentgo/laurent/metrics-refactoring and squashes the following commits: e6c435b [Laurent Goujon] ARROW-385: Refactors metric system Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c5663c6d Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c5663c6d Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c5663c6d Branch: refs/heads/master Commit: c5663c6d00dbd297dac573670156e26dc0593357 Parents: 7d3e2a3 Author: Laurent Goujon Authored: Thu Jan 12 12:21:37 2017 -0500 Committer: Wes McKinney Committed: Thu Jan 12 12:21:37 2017 -0500 ---------------------------------------------------------------------- java/memory/pom.xml | 7 - .../main/java/io/netty/buffer/LargeBuffer.java | 31 +--- .../netty/buffer/PooledByteBufAllocatorL.java | 157 +++++++++---------- .../netty/buffer/UnsafeDirectLittleEndian.java | 34 +--- .../apache/arrow/memory/AllocationListener.java | 40 +++++ .../apache/arrow/memory/AllocationManager.java | 13 +- .../org/apache/arrow/memory/BaseAllocator.java | 30 +++- .../org/apache/arrow/memory/RootAllocator.java | 7 +- .../org/apache/arrow/memory/util/Metrics.java | 40 ----- .../org/apache/arrow/memory/util/Pointer.java | 28 ---- 10 files changed, 158 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/pom.xml ---------------------------------------------------------------------- diff --git a/java/memory/pom.xml b/java/memory/pom.xml index 6ed1448..a4eb652 100644 --- a/java/memory/pom.xml +++ b/java/memory/pom.xml @@ -20,13 +20,6 @@ Arrow Memory - - - com.codahale.metrics - metrics-core - 3.0.1 - - com.google.code.findbugs jsr305 http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java index 5f5e904..c026e43 100644 --- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java +++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java @@ -17,43 +17,16 @@ */ package io.netty.buffer; -import java.util.concurrent.atomic.AtomicLong; - /** * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts. */ public class LargeBuffer extends MutableWrappedByteBuf { - - private final AtomicLong hugeBufferSize; - private final AtomicLong hugeBufferCount; - - private final int initCap; - - public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) { + public LargeBuffer(ByteBuf buffer) { super(buffer); - initCap = buffer.capacity(); - this.hugeBufferCount = hugeBufferCount; - this.hugeBufferSize = hugeBufferSize; } @Override public ByteBuf copy(int index, int length) { - return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount); + return new LargeBuffer(buffer.copy(index, length)); } - - @Override - public boolean release() { - return release(1); - } - - @Override - public boolean release(int decrement) { - boolean released = unwrap().release(decrement); - if (released) { - hugeBufferSize.addAndGet(-initCap); - hugeBufferCount.decrementAndGet(); - } - return released; - } - } http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java index f6feb65..a843ac5 100644 --- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -17,7 +17,7 @@ */ package io.netty.buffer; -import io.netty.util.internal.StringUtil; +import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -25,24 +25,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.arrow.memory.OutOfMemoryException; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; +import io.netty.util.internal.StringUtil; /** * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers. */ public class PooledByteBufAllocatorL { - private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator"); + private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; - - public static final String METRIC_PREFIX = "drill.allocator."; - - private final MetricRegistry registry; private final AtomicLong hugeBufferSize = new AtomicLong(0); private final AtomicLong hugeBufferCount = new AtomicLong(0); private final AtomicLong normalBufferSize = new AtomicLong(0); @@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL { private final InnerAllocator allocator; public final UnsafeDirectLittleEndian empty; - public PooledByteBufAllocatorL(MetricRegistry registry) { - this.registry = registry; + public PooledByteBufAllocatorL() { allocator = new InnerAllocator(); empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); } @@ -70,13 +61,66 @@ public class PooledByteBufAllocatorL { return allocator.chunkSize; } - private class InnerAllocator extends PooledByteBufAllocator { + public long getHugeBufferSize() { + return hugeBufferSize.get(); + } + public long getHugeBufferCount() { + return hugeBufferCount.get(); + } + public long getNormalBufferSize() { + return normalBufferSize.get(); + } + + public long getNormalBufferCount() { + return normalBufferSize.get(); + } + + private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { + private final long initialCapacity; + private final AtomicLong count; + private final AtomicLong size; + + private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public boolean release(int decrement) { + boolean released = super.release(decrement); + if (released) { + count.decrementAndGet(); + size.addAndGet(-initialCapacity); + } + return released; + } + + } + + private class InnerAllocator extends PooledByteBufAllocator { private final PoolArena[] directArenas; private final MemoryStatusThread statusThread; - private final Histogram largeBuffersHist; - private final Histogram normalBuffersHist; private final int chunkSize; public InnerAllocator() { @@ -98,50 +142,6 @@ public class PooledByteBufAllocatorL { } else { statusThread = null; } - removeOldMetrics(); - - registry.register(METRIC_PREFIX + "normal.size", new Gauge() { - @Override - public Long getValue() { - return normalBufferSize.get(); - } - }); - - registry.register(METRIC_PREFIX + "normal.count", new Gauge() { - @Override - public Long getValue() { - return normalBufferCount.get(); - } - }); - - registry.register(METRIC_PREFIX + "huge.size", new Gauge() { - @Override - public Long getValue() { - return hugeBufferSize.get(); - } - }); - - registry.register(METRIC_PREFIX + "huge.count", new Gauge() { - @Override - public Long getValue() { - return hugeBufferCount.get(); - } - }); - - largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist"); - normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist"); - - } - - - private synchronized void removeOldMetrics() { - registry.removeMatching(new MetricFilter() { - @Override - public boolean matches(String name, Metric metric) { - return name.startsWith("drill.allocator."); - } - - }); } private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { @@ -154,12 +154,11 @@ public class PooledByteBufAllocatorL { // This is beyond chunk size so we'll allocate separately. ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); - hugeBufferCount.incrementAndGet(); hugeBufferSize.addAndGet(buf.capacity()); - largeBuffersHist.update(buf.capacity()); - // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); - return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount)); + hugeBufferCount.incrementAndGet(); + // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); + return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize); } else { // within chunk, use arena. ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); @@ -167,14 +166,14 @@ public class PooledByteBufAllocatorL { fail(); } - normalBuffersHist.update(buf.capacity()); - if (ASSERT_ENABLED) { - normalBufferSize.addAndGet(buf.capacity()); - normalBufferCount.incrementAndGet(); + if (!ASSERT_ENABLED) { + return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); } - return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, - normalBufferSize); + normalBufferSize.addAndGet(buf.capacity()); + normalBufferCount.incrementAndGet(); + + return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize); } } else { @@ -184,9 +183,10 @@ public class PooledByteBufAllocatorL { private UnsupportedOperationException fail() { return new UnsupportedOperationException( - "Arrow requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality."); + "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality."); } + @Override public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { newDirectBuffer(initialCapacity, maxCapacity); @@ -215,9 +215,8 @@ public class PooledByteBufAllocatorL { private class MemoryStatusThread extends Thread { public MemoryStatusThread() { - super("memory-status-logger"); + super("allocation.logger"); this.setDaemon(true); - this.setName("allocation.logger"); } @Override @@ -229,12 +228,11 @@ public class PooledByteBufAllocatorL { } catch (InterruptedException e) { return; } - } } - } + @Override public String toString() { StringBuilder buf = new StringBuilder(); buf.append(directArenas.length); @@ -260,13 +258,4 @@ public class PooledByteBufAllocatorL { } - - public static final boolean ASSERT_ENABLED; - - static { - boolean isAssertEnabled = false; - assert isAssertEnabled = true; - ASSERT_ENABLED = isAssertEnabled; - } - } http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java index 023a6a2..5ea1767 100644 --- a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java +++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java @@ -18,8 +18,6 @@ package io.netty.buffer; -import io.netty.util.internal.PlatformDependent; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -32,7 +30,7 @@ import io.netty.util.internal.PlatformDependent; * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the * Netty classes and underlying Netty memory management. */ -public final class UnsafeDirectLittleEndian extends WrappedByteBuf { +public class UnsafeDirectLittleEndian extends WrappedByteBuf { private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; private static final AtomicLong ID_GENERATOR = new AtomicLong(0); @@ -40,35 +38,25 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf { private final AbstractByteBuf wrapped; private final long memoryAddress; - private final AtomicLong bufferCount; - private final AtomicLong bufferSize; - private final long initCap; - UnsafeDirectLittleEndian(DuplicatedByteBuf buf) { - this(buf, true, null, null); + this(buf, true); } UnsafeDirectLittleEndian(LargeBuffer buf) { - this(buf, true, null, null); + this(buf, true); } - UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) { - this(buf, true, bufferCount, bufferSize); + UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) { + this(buf, true); } - private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) { + private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) { super(buf); if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) { throw new IllegalStateException("Arrow only runs on LittleEndian systems."); } - this.bufferCount = bufferCount; - this.bufferSize = bufferSize; - - // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this. - this.initCap = ASSERT_ENABLED ? buf.capacity() : -1; - this.wrapped = buf; this.memoryAddress = buf.memoryAddress(); } @@ -245,16 +233,6 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf { } @Override - public boolean release(int decrement) { - final boolean released = super.release(decrement); - if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) { - bufferCount.decrementAndGet(); - bufferSize.addAndGet(-initCap); - } - return released; - } - - @Override public int setBytes(int index, InputStream in, int length) throws IOException { wrapped.checkIndex(index, length); byte[] tmp = new byte[length]; http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java new file mode 100644 index 0000000..1b127f8 --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.memory; + +/** + * An allocation listener being notified for allocation/deallocation + * + * It is expected to be called from multiple threads and as such, + * provider should take care of making the implementation thread-safe + */ +public interface AllocationListener { + public static final AllocationListener NOOP = new AllocationListener() { + @Override + public void onAllocation(long size) { + } + }; + + /** + * Called each time a new buffer is allocated + * + * @param size the buffer size being allocated + */ + void onAllocation(long size); + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 43ee9c1..f15bb8a 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -18,9 +18,6 @@ package org.apache.arrow.memory; import static org.apache.arrow.memory.BaseAllocator.indent; -import io.netty.buffer.ArrowBuf; -import io.netty.buffer.PooledByteBufAllocatorL; -import io.netty.buffer.UnsafeDirectLittleEndian; import java.util.IdentityHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -31,10 +28,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.util.AutoCloseableLock; import org.apache.arrow.memory.util.HistoricalLog; -import org.apache.arrow.memory.util.Metrics; import com.google.common.base.Preconditions; +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.UnsafeDirectLittleEndian; + /** * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators. @@ -56,7 +56,10 @@ public class AllocationManager { private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); - static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(Metrics.getInstance()); + private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); + + static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; + static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); private final RootAllocator root; private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index dbb0705..9edafbc 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.IdentityHashMap; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.arrow.memory.AllocationManager.BufferLedger; import org.apache.arrow.memory.util.AssertionUtil; @@ -37,14 +36,12 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator"; - private static final AtomicLong ID_GENERATOR = new AtomicLong(0); - private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize(); - public static final int DEBUG_LOG_LENGTH = 6; public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false")); private final Object DEBUG_LOCK = DEBUG ? new Object() : null; + private final AllocationListener listener; private final BaseAllocator parentAllocator; private final ArrowByteBufAllocator thisAsByteBufAllocator; private final IdentityHashMap childAllocators; @@ -62,12 +59,31 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato private final HistoricalLog historicalLog; protected BaseAllocator( + final AllocationListener listener, + final String name, + final long initReservation, + final long maxAllocation) throws OutOfMemoryException { + this(listener, null, name, initReservation, maxAllocation); + } + + protected BaseAllocator( + final BaseAllocator parentAllocator, + final String name, + final long initReservation, + final long maxAllocation) throws OutOfMemoryException { + this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation); + } + + private BaseAllocator( + final AllocationListener listener, final BaseAllocator parentAllocator, final String name, final long initReservation, final long maxAllocation) throws OutOfMemoryException { super(parentAllocator, initReservation, maxAllocation); + this.listener = listener; + if (parentAllocator != null) { this.root = parentAllocator.root; empty = parentAllocator.empty; @@ -192,7 +208,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato private ArrowBuf createEmpty(){ assertOpen(); - return new ArrowBuf(new AtomicInteger(), null, AllocationManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true); + return new ArrowBuf(new AtomicInteger(), null, AllocationManager.EMPTY, null, null, 0, 0, true); } @Override @@ -206,7 +222,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato } // round to next largest power of two if we're within a chunk since that is how our allocator operates - final int actualRequestSize = initialRequestSize < CHUNK_SIZE ? + final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ? nextPowerOfTwo(initialRequestSize) : initialRequestSize; AllocationOutcome outcome = this.allocateBytes(actualRequestSize); @@ -218,6 +234,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato try { ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager); success = true; + listener.onAllocation(actualRequestSize); return buffer; } finally { if (!success) { @@ -405,6 +422,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato try { final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null); + listener.onAllocation(nBytes); if (DEBUG) { historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId())); } http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 571fc37..57a2c0c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -24,9 +24,12 @@ import com.google.common.annotations.VisibleForTesting; * tree of descendant child allocators. */ public class RootAllocator extends BaseAllocator { - public RootAllocator(final long limit) { - super(null, "ROOT", 0, limit); + this(AllocationListener.NOOP, limit); + } + + public RootAllocator(final AllocationListener listener, final long limit) { + super(listener, "ROOT", 0, limit); } /** http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java deleted file mode 100644 index 5177a24..0000000 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.arrow.memory.util; - -import com.codahale.metrics.MetricRegistry; - -public class Metrics { - - private Metrics() { - - } - - private static class RegistryHolder { - public static final MetricRegistry REGISTRY; - - static { - REGISTRY = new MetricRegistry(); - } - - } - - public static MetricRegistry getInstance() { - return RegistryHolder.REGISTRY; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java deleted file mode 100644 index 58ab13b..0000000 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.arrow.memory.util; - -public class Pointer { - public T value; - - public Pointer(){} - - public Pointer(T value){ - this.value = value; - } -}