From commits-return-1963-archive-asf-public=cust-asf.ponee.io@orc.apache.org Tue Oct 8 22:49:58 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 913E3180654 for ; Wed, 9 Oct 2019 00:49:58 +0200 (CEST) Received: (qmail 26644 invoked by uid 500); 8 Oct 2019 22:49:58 -0000 Mailing-List: contact commits-help@orc.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@orc.apache.org Delivered-To: mailing list commits@orc.apache.org Received: (qmail 26631 invoked by uid 99); 8 Oct 2019 22:49:58 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Oct 2019 22:49:57 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D3E89890A2; Tue, 8 Oct 2019 22:49:57 +0000 (UTC) Date: Tue, 08 Oct 2019 22:49:57 +0000 To: "commits@orc.apache.org" Subject: [orc] branch master updated: ORC-361: Remove the single thread restriction on the MemoryManagerImpl. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157057499775.29867.12700146891998173475@gitbox.apache.org> From: omalley@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: orc X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 5242bff6e50879ad5a919e70be023d27e0c16b2d X-Git-Newrev: d657ed4f516d21e3b573c3ec4a4f31867a8ccf70 X-Git-Rev: d657ed4f516d21e3b573c3ec4a4f31867a8ccf70 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. omalley pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/orc.git The following commit(s) were added to refs/heads/master by this push: new d657ed4 ORC-361: Remove the single thread restriction on the MemoryManagerImpl. d657ed4 is described below commit d657ed4f516d21e3b573c3ec4a4f31867a8ccf70 Author: Owen O'Malley AuthorDate: Fri Oct 4 15:08:10 2019 -0700 ORC-361: Remove the single thread restriction on the MemoryManagerImpl. Fixes #433 Signed-off-by: Owen O'Malley --- .../src/java/org/apache/orc/MemoryManager.java | 19 +++- java/core/src/java/org/apache/orc/OrcFile.java | 15 +-- .../org/apache/orc/impl/MemoryManagerImpl.java | 122 +++++---------------- .../src/java/org/apache/orc/impl/WriterImpl.java | 45 +++++--- .../src/test/org/apache/orc/TestVectorOrcFile.java | 100 ++++------------- .../org/apache/orc/impl/TestMemoryManager.java | 14 +-- 6 files changed, 105 insertions(+), 210 deletions(-) diff --git a/java/core/src/java/org/apache/orc/MemoryManager.java b/java/core/src/java/org/apache/orc/MemoryManager.java index 3afd3f5..258f381 100644 --- a/java/core/src/java/org/apache/orc/MemoryManager.java +++ b/java/core/src/java/org/apache/orc/MemoryManager.java @@ -36,10 +36,10 @@ public interface MemoryManager { interface Callback { /** - * The writer needs to check its memory usage + * The scale factor for the stripe size has changed and thus the + * writer should adjust their desired size appropriately. * @param newScale the current scale factor for memory allocations * @return true if the writer was over the limit - * @throws IOException */ boolean checkMemory(double newScale) throws IOException; } @@ -63,6 +63,21 @@ public interface MemoryManager { * Give the memory manager an opportunity for doing a memory check. * @param rows number of rows added * @throws IOException + * @deprecated Use {@link MemoryManager#checkMemory} instead */ void addedRow(int rows) throws IOException; + + /** + * As part of adding rows, the writer calls this method to determine + * if the scale factor has changed. If it has changed, the Callback will be + * called. + * @param previousAllocation the previous allocation + * @param writer the callback to call back into if we need to + * @return the current allocation + */ + default long checkMemory(long previousAllocation, + Callback writer) throws IOException { + addedRow(1024); + return previousAllocation; + } } diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index 803b609..e1dced0 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -942,19 +942,14 @@ public class OrcFile { return new WriterOptions(tableProperties, conf); } - private static ThreadLocal memoryManager = null; + private static MemoryManager memoryManager = null; - private static synchronized MemoryManager getStaticMemoryManager( - final Configuration conf) { + private static synchronized + MemoryManager getStaticMemoryManager(Configuration conf) { if (memoryManager == null) { - memoryManager = new ThreadLocal() { - @Override - protected MemoryManager initialValue() { - return new MemoryManagerImpl(conf); - } - }; + memoryManager = new MemoryManagerImpl(conf); } - return memoryManager.get(); + return memoryManager; } /** diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java index ac589a0..4e78450 100644 --- a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java +++ b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ package org.apache.orc.impl; import org.apache.orc.MemoryManager; import org.apache.orc.OrcConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -29,7 +27,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicLong; /** * Implements a memory manager that keeps a global context of how many ORC @@ -37,40 +35,20 @@ import java.util.concurrent.locks.ReentrantLock; * dynamic partitions, it is easy to end up with many writers in the same task. * By managing the size of each allocation, we try to cut down the size of each * allocation and keep the task from running out of memory. - * + * * This class is not thread safe, but is re-entrant - ensure creation and all * invocations are triggered from the same thread. */ public class MemoryManagerImpl implements MemoryManager { - private static final Logger LOG = LoggerFactory.getLogger(MemoryManagerImpl.class); - - /** - * How often should we check the memory sizes? Measured in rows added - * to all of the writers. - */ - final long ROWS_BETWEEN_CHECKS; private final long totalMemoryPool; - private final Map writerList = - new HashMap(); - private long totalAllocation = 0; - private double currentScale = 1; - private int rowsAddedSinceCheck = 0; - private final OwnedLock ownerLock = new OwnedLock(); - - @SuppressWarnings("serial") - private static class OwnedLock extends ReentrantLock { - public Thread getOwner() { - return super.getOwner(); - } - } + private final Map writerList = new HashMap<>(); + private final AtomicLong totalAllocation = new AtomicLong(0); private static class WriterInfo { long allocation; - Callback callback; - WriterInfo(long allocation, Callback callback) { + WriterInfo(long allocation) { this.allocation = allocation; - this.callback = callback; } } @@ -80,26 +58,16 @@ public class MemoryManagerImpl implements MemoryManager { * pool. */ public MemoryManagerImpl(Configuration conf) { - double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); - ROWS_BETWEEN_CHECKS = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf); - LOG.info(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "=" + ROWS_BETWEEN_CHECKS); - if(ROWS_BETWEEN_CHECKS < 1 || ROWS_BETWEEN_CHECKS > 10000) { - throw new IllegalArgumentException(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "=" - + ROWS_BETWEEN_CHECKS + " is outside valid range [1,10000]."); - } - totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax() * maxLoad); - ownerLock.lock(); + this(Math.round(ManagementFactory.getMemoryMXBean(). + getHeapMemoryUsage().getMax() * OrcConf.MEMORY_POOL.getDouble(conf))); } /** - * Light weight thread-safety check for multi-threaded access patterns + * Create the memory manager + * @param poolSize the size of memory to use */ - private void checkOwner() { - if (!ownerLock.isHeldByCurrentThread()) { - LOG.warn("Owner thread expected {}, got {}", - ownerLock.getOwner(), Thread.currentThread()); - } + public MemoryManagerImpl(long poolSize) { + totalMemoryPool = poolSize; } /** @@ -108,43 +76,32 @@ public class MemoryManagerImpl implements MemoryManager { * @param path the file that is being written * @param requestedAllocation the requested buffer size */ - public void addWriter(Path path, long requestedAllocation, + public synchronized void addWriter(Path path, long requestedAllocation, Callback callback) throws IOException { - checkOwner(); WriterInfo oldVal = writerList.get(path); // this should always be null, but we handle the case where the memory // manager wasn't told that a writer wasn't still in use and the task // starts writing to the same path. if (oldVal == null) { - oldVal = new WriterInfo(requestedAllocation, callback); + oldVal = new WriterInfo(requestedAllocation); writerList.put(path, oldVal); - totalAllocation += requestedAllocation; + totalAllocation.addAndGet(requestedAllocation); } else { // handle a new writer that is writing to the same path - totalAllocation += requestedAllocation - oldVal.allocation; + totalAllocation.addAndGet(requestedAllocation - oldVal.allocation); oldVal.allocation = requestedAllocation; - oldVal.callback = callback; } - updateScale(true); } /** * Remove the given writer from the pool. * @param path the file that has been closed */ - public void removeWriter(Path path) throws IOException { - checkOwner(); + public synchronized void removeWriter(Path path) throws IOException { WriterInfo val = writerList.get(path); if (val != null) { writerList.remove(path); - totalAllocation -= val.allocation; - if (writerList.isEmpty()) { - rowsAddedSinceCheck = 0; - } - updateScale(false); - } - if(writerList.isEmpty()) { - rowsAddedSinceCheck = 0; + totalAllocation.addAndGet(-val.allocation); } } @@ -163,48 +120,29 @@ public class MemoryManagerImpl implements MemoryManager { * available for each writer. */ public double getAllocationScale() { - return currentScale; + long alloc = totalAllocation.get(); + return alloc <= totalMemoryPool ? 1.0 : (double) totalMemoryPool / alloc; } - /** - * Give the memory manager an opportunity for doing a memory check. - * @param rows number of rows added - * @throws IOException - */ @Override public void addedRow(int rows) throws IOException { - rowsAddedSinceCheck += rows; - if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { - notifyWriters(); - } + // PASS } /** - * Notify all of the writers that they should check their memory usage. - * @throws IOException + * Obsolete method left for Hive, which extends this class. + * @deprecated remove this method */ public void notifyWriters() throws IOException { - checkOwner(); - LOG.debug("Notifying writers after " + rowsAddedSinceCheck); - for(WriterInfo writer: writerList.values()) { - boolean flushed = writer.callback.checkMemory(currentScale); - if (LOG.isDebugEnabled() && flushed) { - LOG.debug("flushed " + writer.toString()); - } - } - rowsAddedSinceCheck = 0; + // PASS } - /** - * Update the currentScale based on the current allocation and pool size. - * This also updates the notificationTrigger. - * @param isAllocate is this an allocation? - */ - private void updateScale(boolean isAllocate) throws IOException { - if (totalAllocation <= getTotalMemoryPool()) { - currentScale = 1; - } else { - currentScale = (double) getTotalMemoryPool() / totalAllocation; + @Override + public long checkMemory(long previous, Callback writer) throws IOException { + long current = totalAllocation.get(); + if (current != previous) { + writer.checkMemory(getAllocationScale()); } + return current; } } diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index 9e65f2c..31a401a 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -87,7 +87,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { private static final int MIN_ROW_INDEX_STRIDE = 1000; private final Path path; - private long adjustedStripeSize; + private final long stripeSize; private final int rowIndexStride; private final TypeDescription schema; private final PhysicalWriter physicalWriter; @@ -107,6 +107,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { private final TreeWriter treeWriter; private final boolean buildIndex; private final MemoryManager memoryManager; + private long previousAllocation = -1; + private long memoryLimit; + private final long ROWS_PER_CHECK; + private long rowsSinceCheck = 0; private final OrcFile.Version version; private final Configuration conf; private final OrcFile.WriterCallback callback; @@ -178,7 +182,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { } this.writeTimeZone = hasTimestamp(schema); this.useUTCTimeZone = opts.getUseUTCTimestamp(); - this.adjustedStripeSize = opts.getStripeSize(); + this.stripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); this.compressionStrategy = opts.getCompressionStrategy(); @@ -212,9 +216,11 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { MIN_ROW_INDEX_STRIDE); } // ensure that we are able to handle callbacks before we register ourselves - memoryManager.addWriter(path, opts.getStripeSize(), this); + ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf); + memoryLimit = stripeSize; + memoryManager.addWriter(path, stripeSize, this); LOG.info("ORC writer created for path: {} with stripeSize: {} options: {}", - path, adjustedStripeSize, unencryptedOptions); + path, stripeSize, unencryptedOptions); } //@VisibleForTesting @@ -226,7 +232,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { // sizes. int estBufferSize = (int) (stripeSize / (20L * numColumns)); estBufferSize = getClosestBufferSize(estBufferSize); - return estBufferSize > bs ? bs : estBufferSize; + return Math.min(estBufferSize, bs); } @Override @@ -286,15 +292,22 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { @Override public boolean checkMemory(double newScale) throws IOException { - long limit = Math.round(adjustedStripeSize * newScale); - long size = treeWriter.estimateMemory(); - if (LOG.isDebugEnabled()) { - LOG.debug("ORC writer " + physicalWriter + " size = " + size + - " limit = " + limit); - } - if (size > limit) { - flushStripe(); - return true; + memoryLimit = Math.round(stripeSize * newScale); + return checkMemory(); + } + + private boolean checkMemory() throws IOException { + if (rowsSinceCheck >= ROWS_PER_CHECK) { + rowsSinceCheck = 0; + long size = treeWriter.estimateMemory(); + if (LOG.isDebugEnabled()) { + LOG.debug("ORC writer " + physicalWriter + " size = " + size + + " limit = " + memoryLimit); + } + if (size > memoryLimit) { + flushStripe(); + return true; + } } return false; } @@ -676,7 +689,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { rowsInStripe += batch.size; treeWriter.writeRootBatch(batch, 0, batch.size); } - memoryManager.addedRow(batch.size); + rowsSinceCheck += batch.size; + previousAllocation = memoryManager.checkMemory(previousAllocation, this); + checkMemory(); } catch (Throwable t) { try { close(); diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java index 6226719..a27d69a 100644 --- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java +++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java @@ -20,6 +20,7 @@ package org.apache.orc; import org.apache.orc.impl.InStream; import org.apache.orc.impl.KeyProvider; +import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcCodecPool; import org.apache.orc.impl.WriterImpl; @@ -2449,54 +2450,30 @@ public class TestVectorOrcFile { new MiddleStruct(inner, inner2), list(), map(inner, inner2)); } - private static class MyMemoryManager implements MemoryManager { - double rate; - Path path = null; - int rows = 0; - Callback callback; - - MyMemoryManager(Configuration conf, long totalSpace, double rate) { - this.rate = rate; - } - - @Override - public void addWriter(Path path, long requestedAllocation, - Callback callback) { - this.path = path; - this.callback = callback; - } - - @Override - public synchronized void removeWriter(Path path) { - this.path = null; - } - + @Test + public void testMemoryManagement() throws Exception { + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 100); + final long POOL_SIZE = 50_000; + TypeDescription schema = createInnerSchema(); + MemoryManagerImpl memoryMgr = new MemoryManagerImpl(POOL_SIZE); - @Override - public void addedRow(int count) throws IOException { - rows += count; - if (rows % 100 == 0) { - callback.checkMemory(rate); - } + // set up 10 files that all request the full size. + MemoryManager.Callback ignore = newScale -> false; + for(int f=0; f < 9; ++f) { + memoryMgr.addWriter(new Path("file-" + f), POOL_SIZE, ignore); } - } - @Test - public void testMemoryManagementV11() throws Exception { - Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11); - - TypeDescription schema = createInnerSchema(); - MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1); Writer writer = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf) .setSchema(schema) .compress(CompressionKind.NONE) - .stripeSize(50000) + .stripeSize(POOL_SIZE) .bufferSize(100) .rowIndexStride(0) - .memory(memory) + .memory(memoryMgr) .version(fileFormat)); - assertEquals(testFilePath, memory.path); + // check to make sure it is 10% + assertEquals(0.1, memoryMgr.getAllocationScale(), 0.001); VectorizedRowBatch batch = schema.createRowBatch(); batch.size = 1; for(int i=0; i < 2500; ++i) { @@ -2506,56 +2483,17 @@ public class TestVectorOrcFile { writer.addRowBatch(batch); } writer.close(); - assertEquals(null, memory.path); + assertEquals(0.111, memoryMgr.getAllocationScale(), 0.001); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); int i = 0; for(StripeInformation stripe: reader.getStripes()) { i += 1; assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(), - stripe.getDataLength() < 5000); - } - assertEquals(25, i); - assertEquals(2500, reader.getNumberOfRows()); - } - - @Test - public void testMemoryManagementV12() throws Exception { - Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11); - TypeDescription schema = createInnerSchema(); - MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1); - Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf) - .setSchema(schema) - .compress(CompressionKind.NONE) - .stripeSize(50000) - .bufferSize(100) - .rowIndexStride(0) - .memory(memory) - .version(fileFormat)); - VectorizedRowBatch batch = schema.createRowBatch(); - assertEquals(testFilePath, memory.path); - batch.size = 1; - for(int i=0; i < 2500; ++i) { - ((LongColumnVector) batch.cols[0]).vector[0] = i * 300; - ((BytesColumnVector) batch.cols[1]).setVal(0, - Integer.toHexString(10*i).getBytes(StandardCharsets.UTF_8)); - writer.addRowBatch(batch); + stripe.getDataLength() < POOL_SIZE); } - writer.close(); - assertEquals(null, memory.path); - Reader reader = OrcFile.createReader(testFilePath, - OrcFile.readerOptions(conf).filesystem(fs)); - int i = 0; - for(StripeInformation stripe: reader.getStripes()) { - i += 1; - assertTrue(testFilePath + " stripe " + i + " is too long at " + - stripe.getDataLength(), stripe.getDataLength() < 5000); - } - // with HIVE-7832, the dictionaries will be disabled after writing the first - // stripe as there are too many distinct values. Hence only 3 stripes as - // compared to 25 stripes in version 0.11 (above test case) - assertEquals(3, i); + // 0.11 always uses the dictionary, so ends up with a lot more stripes + assertEquals(fileFormat == OrcFile.Version.V_0_11 ? 25 : 3, i); assertEquals(2500, reader.getNumberOfRows()); } diff --git a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java index 109b95e..dab59de 100644 --- a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java +++ b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java @@ -79,18 +79,12 @@ public class TestMemoryManager { Configuration conf = new Configuration(); conf.set("hive.exec.orc.memory.pool", "0.9"); MemoryManagerImpl mgr = new MemoryManagerImpl(conf); - assertEquals("Wrong default ", - OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf), mgr.ROWS_BETWEEN_CHECKS); long mem = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); System.err.print("Memory = " + mem); long pool = mgr.getTotalMemoryPool(); assertTrue("Pool too small: " + pool, mem * 0.899 < pool); assertTrue("Pool too big: " + pool, pool < mem * 0.901); - - conf.setLong(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 1234); - mgr = new MemoryManagerImpl(conf); - assertEquals("Wrong default ", 1234, mgr.ROWS_BETWEEN_CHECKS); } private static class DoubleMatcher extends BaseMatcher { @@ -128,12 +122,12 @@ public class TestMemoryManager { calls[i] = Mockito.mock(MemoryManager.Callback.class); mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]); } - // add enough rows to get the memory manager to check the limits - for(int i=0; i < 10000; ++i) { - mgr.addedRow(1); + // check to make sure that they get scaled down + for(int i=0; i < calls.length; ++i) { + mgr.checkMemory(0, calls[i]); } for(int call=0; call < calls.length; ++call) { - Mockito.verify(calls[call], Mockito.times(2)) + Mockito.verify(calls[call]) .checkMemory(Matchers.doubleThat(closeTo(0.2, ERROR))); } }