orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [orc] branch branch-1.5 updated: ORC-361: Remove the single thread restriction on the MemoryManagerImpl.
Date Tue, 08 Oct 2019 22:58:00 GMT
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 3aed316  ORC-361: Remove the single thread restriction on the MemoryManagerImpl.
3aed316 is described below

commit 3aed31686676fbff7367e8563b499c0d568d250f
Author: Owen O'Malley <omalley@apache.org>
AuthorDate: Fri Oct 4 15:08:10 2019 -0700

    ORC-361: Remove the single thread restriction on the MemoryManagerImpl.
    
    Fixes #433
    
    Signed-off-by: Owen O'Malley <omalley@apache.org>
---
 .../src/java/org/apache/orc/MemoryManager.java     |  19 +++-
 java/core/src/java/org/apache/orc/OrcFile.java     |  15 +--
 .../org/apache/orc/impl/MemoryManagerImpl.java     | 122 +++++----------------
 .../src/java/org/apache/orc/impl/WriterImpl.java   |  47 +++++---
 .../src/test/org/apache/orc/TestVectorOrcFile.java | 103 ++++-------------
 .../org/apache/orc/impl/TestMemoryManager.java     |  14 +--
 java/pom.xml                                       |   4 +-
 7 files changed, 108 insertions(+), 216 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/MemoryManager.java b/java/core/src/java/org/apache/orc/MemoryManager.java
index 3afd3f5..258f381 100644
--- a/java/core/src/java/org/apache/orc/MemoryManager.java
+++ b/java/core/src/java/org/apache/orc/MemoryManager.java
@@ -36,10 +36,10 @@ public interface MemoryManager {
 
   interface Callback {
     /**
-     * The writer needs to check its memory usage
+     * The scale factor for the stripe size has changed and thus the
+     * writer should adjust their desired size appropriately.
      * @param newScale the current scale factor for memory allocations
      * @return true if the writer was over the limit
-     * @throws IOException
      */
     boolean checkMemory(double newScale) throws IOException;
   }
@@ -63,6 +63,21 @@ public interface MemoryManager {
    * Give the memory manager an opportunity for doing a memory check.
    * @param rows number of rows added
    * @throws IOException
+   * @deprecated Use {@link MemoryManager#checkMemory} instead
    */
   void addedRow(int rows) throws IOException;
+
+  /**
+   * As part of adding rows, the writer calls this method to determine
+   * if the scale factor has changed. If it has changed, the Callback will be
+   * called.
+   * @param previousAllocation the previous allocation
+   * @param writer the callback to call back into if we need to
+   * @return the current allocation
+   */
+  default long checkMemory(long previousAllocation,
+                           Callback writer) throws IOException {
+    addedRow(1024);
+    return previousAllocation;
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index f9c5f4a..62e6260 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -827,19 +827,14 @@ public class OrcFile {
     return new WriterOptions(tableProperties, conf);
   }
 
-  private static ThreadLocal<MemoryManager> memoryManager = null;
+  private static MemoryManager memoryManager = null;
 
-  private static synchronized MemoryManager getStaticMemoryManager(
-      final Configuration conf) {
+  private static synchronized
+  MemoryManager getStaticMemoryManager(Configuration conf) {
     if (memoryManager == null) {
-      memoryManager = new ThreadLocal<MemoryManager>() {
-        @Override
-        protected MemoryManager initialValue() {
-          return new MemoryManagerImpl(conf);
-        }
-      };
+      memoryManager = new MemoryManagerImpl(conf);
     }
-    return memoryManager.get();
+    return memoryManager;
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
index ac589a0..4e78450 100644
--- a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.orc.impl;
 
 import org.apache.orc.MemoryManager;
 import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -29,7 +27,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Implements a memory manager that keeps a global context of how many ORC
@@ -37,40 +35,20 @@ import java.util.concurrent.locks.ReentrantLock;
  * dynamic partitions, it is easy to end up with many writers in the same task.
  * By managing the size of each allocation, we try to cut down the size of each
  * allocation and keep the task from running out of memory.
- * 
+ *
  * This class is not thread safe, but is re-entrant - ensure creation and all
  * invocations are triggered from the same thread.
  */
 public class MemoryManagerImpl implements MemoryManager {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MemoryManagerImpl.class);
-
-  /**
-   * How often should we check the memory sizes? Measured in rows added
-   * to all of the writers.
-   */
-  final long ROWS_BETWEEN_CHECKS;
   private final long totalMemoryPool;
-  private final Map<Path, WriterInfo> writerList =
-      new HashMap<Path, WriterInfo>();
-  private long totalAllocation = 0;
-  private double currentScale = 1;
-  private int rowsAddedSinceCheck = 0;
-  private final OwnedLock ownerLock = new OwnedLock();
-
-  @SuppressWarnings("serial")
-  private static class OwnedLock extends ReentrantLock {
-    public Thread getOwner() {
-      return super.getOwner();
-    }
-  }
+  private final Map<Path, WriterInfo> writerList = new HashMap<>();
+  private final AtomicLong totalAllocation = new AtomicLong(0);
 
   private static class WriterInfo {
     long allocation;
-    Callback callback;
-    WriterInfo(long allocation, Callback callback) {
+    WriterInfo(long allocation) {
       this.allocation = allocation;
-      this.callback = callback;
     }
   }
 
@@ -80,26 +58,16 @@ public class MemoryManagerImpl implements MemoryManager {
    *             pool.
    */
   public MemoryManagerImpl(Configuration conf) {
-    double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
-    ROWS_BETWEEN_CHECKS = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
-    LOG.info(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "=" + ROWS_BETWEEN_CHECKS);
-    if(ROWS_BETWEEN_CHECKS < 1 || ROWS_BETWEEN_CHECKS > 10000) {
-      throw new IllegalArgumentException(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "="
-        + ROWS_BETWEEN_CHECKS + " is outside valid range [1,10000].");
-    }
-    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
-        getHeapMemoryUsage().getMax() * maxLoad);
-    ownerLock.lock();
+    this(Math.round(ManagementFactory.getMemoryMXBean().
+        getHeapMemoryUsage().getMax() * OrcConf.MEMORY_POOL.getDouble(conf)));
   }
 
   /**
-   * Light weight thread-safety check for multi-threaded access patterns
+   * Create the memory manager
+   * @param poolSize the size of memory to use
    */
-  private void checkOwner() {
-    if (!ownerLock.isHeldByCurrentThread()) {
-      LOG.warn("Owner thread expected {}, got {}",
-          ownerLock.getOwner(), Thread.currentThread());
-    }
+  public MemoryManagerImpl(long poolSize) {
+    totalMemoryPool = poolSize;
   }
 
   /**
@@ -108,43 +76,32 @@ public class MemoryManagerImpl implements MemoryManager {
    * @param path the file that is being written
    * @param requestedAllocation the requested buffer size
    */
-  public void addWriter(Path path, long requestedAllocation,
+  public synchronized void addWriter(Path path, long requestedAllocation,
                               Callback callback) throws IOException {
-    checkOwner();
     WriterInfo oldVal = writerList.get(path);
     // this should always be null, but we handle the case where the memory
     // manager wasn't told that a writer wasn't still in use and the task
     // starts writing to the same path.
     if (oldVal == null) {
-      oldVal = new WriterInfo(requestedAllocation, callback);
+      oldVal = new WriterInfo(requestedAllocation);
       writerList.put(path, oldVal);
-      totalAllocation += requestedAllocation;
+      totalAllocation.addAndGet(requestedAllocation);
     } else {
       // handle a new writer that is writing to the same path
-      totalAllocation += requestedAllocation - oldVal.allocation;
+      totalAllocation.addAndGet(requestedAllocation - oldVal.allocation);
       oldVal.allocation = requestedAllocation;
-      oldVal.callback = callback;
     }
-    updateScale(true);
   }
 
   /**
    * Remove the given writer from the pool.
    * @param path the file that has been closed
    */
-  public void removeWriter(Path path) throws IOException {
-    checkOwner();
+  public synchronized void removeWriter(Path path) throws IOException {
     WriterInfo val = writerList.get(path);
     if (val != null) {
       writerList.remove(path);
-      totalAllocation -= val.allocation;
-      if (writerList.isEmpty()) {
-        rowsAddedSinceCheck = 0;
-      }
-      updateScale(false);
-    }
-    if(writerList.isEmpty()) {
-      rowsAddedSinceCheck = 0;
+      totalAllocation.addAndGet(-val.allocation);
     }
   }
 
@@ -163,48 +120,29 @@ public class MemoryManagerImpl implements MemoryManager {
    * available for each writer.
    */
   public double getAllocationScale() {
-    return currentScale;
+    long alloc = totalAllocation.get();
+    return alloc <= totalMemoryPool ? 1.0 : (double) totalMemoryPool / alloc;
   }
 
-  /**
-   * Give the memory manager an opportunity for doing a memory check.
-   * @param rows number of rows added
-   * @throws IOException
-   */
   @Override
   public void addedRow(int rows) throws IOException {
-    rowsAddedSinceCheck += rows;
-    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
-      notifyWriters();
-    }
+    // PASS
   }
 
   /**
-   * Notify all of the writers that they should check their memory usage.
-   * @throws IOException
+   * Obsolete method left for Hive, which extends this class.
+   * @deprecated remove this method
    */
   public void notifyWriters() throws IOException {
-    checkOwner();
-    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
-    for(WriterInfo writer: writerList.values()) {
-      boolean flushed = writer.callback.checkMemory(currentScale);
-      if (LOG.isDebugEnabled() && flushed) {
-        LOG.debug("flushed " + writer.toString());
-      }
-    }
-    rowsAddedSinceCheck = 0;
+    // PASS
   }
 
-  /**
-   * Update the currentScale based on the current allocation and pool size.
-   * This also updates the notificationTrigger.
-   * @param isAllocate is this an allocation?
-   */
-  private void updateScale(boolean isAllocate) throws IOException {
-    if (totalAllocation <= getTotalMemoryPool()) {
-      currentScale = 1;
-    } else {
-      currentScale = (double) getTotalMemoryPool() / totalAllocation;
+  @Override
+  public long checkMemory(long previous, Callback writer) throws IOException {
+    long current = totalAllocation.get();
+    if (current != previous) {
+      writer.checkMemory(getAllocationScale());
     }
+    return current;
   }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 5ec7d70..c7e5818 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -80,7 +80,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
   private final Path path;
-  private long adjustedStripeSize;
+  private final long stripeSize;
   private final int rowIndexStride;
   private final CompressionKind compress;
   private int bufferSize;
@@ -103,6 +103,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
   private final TreeWriter treeWriter;
   private final boolean buildIndex;
   private final MemoryManager memoryManager;
+  private long previousAllocation = -1;
+  private long memoryLimit;
+  private final long ROWS_PER_CHECK;
+  private long rowsSinceCheck = 0;
   private final OrcFile.Version version;
   private final Configuration conf;
   private final OrcFile.WriterCallback callback;
@@ -143,7 +147,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
     }
     this.writeTimeZone = hasTimestamp(schema);
     this.useUTCTimeZone = opts.getUseUTCTimestamp();
-    this.adjustedStripeSize = opts.getStripeSize();
+    this.stripeSize = opts.getStripeSize();
     this.version = opts.getVersion();
     this.encodingStrategy = opts.getEncodingStrategy();
     this.compressionStrategy = opts.getCompressionStrategy();
@@ -156,7 +160,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
       OutStream.assertBufferSizeValid(opts.getBufferSize());
       this.bufferSize = opts.getBufferSize();
     } else {
-      this.bufferSize = getEstimatedBufferSize(adjustedStripeSize,
+      this.bufferSize = getEstimatedBufferSize(stripeSize,
           numColumns, opts.getBufferSize());
     }
     if (version == OrcFile.Version.FUTURE) {
@@ -188,10 +192,12 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
     }
 
     // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, opts.getStripeSize(), this);
+    memoryManager.addWriter(path, stripeSize, this);
     LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
-        " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(),
+        " compression: {} bufferSize: {}", path, stripeSize, opts.getBlockSize(),
         compress, bufferSize);
+    ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
+    memoryLimit = stripeSize;
   }
 
   //@VisibleForTesting
@@ -203,7 +209,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
     // sizes.
     int estBufferSize = (int) (stripeSize / (20L * numColumns));
     estBufferSize = getClosestBufferSize(estBufferSize);
-    return estBufferSize > bs ? bs : estBufferSize;
+    return Math.min(estBufferSize, bs);
   }
 
   @Override
@@ -260,15 +266,22 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
 
   @Override
   public boolean checkMemory(double newScale) throws IOException {
-    long limit = Math.round(adjustedStripeSize * newScale);
-    long size = treeWriter.estimateMemory();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ORC writer " + physicalWriter + " size = " + size +
-          " limit = " + limit);
-    }
-    if (size > limit) {
-      flushStripe();
-      return true;
+    memoryLimit = Math.round(stripeSize * newScale);
+    return checkMemory();
+  }
+
+  private boolean checkMemory() throws IOException {
+    if (rowsSinceCheck >= ROWS_PER_CHECK) {
+      rowsSinceCheck = 0;
+      long size = treeWriter.estimateMemory();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+                      " limit = " + memoryLimit);
+      }
+      if (size > memoryLimit) {
+        flushStripe();
+        return true;
+      }
     }
     return false;
   }
@@ -570,7 +583,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback
{
       rowsInStripe += batch.size;
       treeWriter.writeRootBatch(batch, 0, batch.size);
     }
-    memoryManager.addedRow(batch.size);
+    rowsSinceCheck += batch.size;
+    previousAllocation = memoryManager.checkMemory(previousAllocation, this);
+    checkMemory();
   }
 
   @Override
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 800fab2..cd795fe 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -19,6 +19,7 @@
 package org.apache.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.orc.impl.MemoryManagerImpl;
 import org.apache.orc.impl.OrcCodecPool;
 
 import org.apache.orc.impl.PhysicalFsWriter;
@@ -2344,57 +2345,30 @@ public class TestVectorOrcFile {
         new MiddleStruct(inner, inner2), list(), map(inner, inner2));
   }
 
-  private static class MyMemoryManager implements MemoryManager {
-    double rate;
-    Path path = null;
-    long lastAllocation = 0;
-    int rows = 0;
-    Callback callback;
-
-    MyMemoryManager(Configuration conf, long totalSpace, double rate) {
-      this.rate = rate;
-    }
-
-    @Override
-    public void addWriter(Path path, long requestedAllocation,
-                   Callback callback) {
-      this.path = path;
-      this.lastAllocation = requestedAllocation;
-      this.callback = callback;
-    }
-
-    @Override
-    public synchronized void removeWriter(Path path) {
-      this.path = null;
-      this.lastAllocation = 0;
-    }
-
+  @Test
+  public void testMemoryManagement() throws Exception {
+    OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 100);
+    final long POOL_SIZE = 50_000;
+    TypeDescription schema = createInnerSchema();
+    MemoryManagerImpl memoryMgr = new MemoryManagerImpl(POOL_SIZE);
 
-    @Override
-    public void addedRow(int count) throws IOException {
-      rows += count;
-      if (rows % 100 == 0) {
-        callback.checkMemory(rate);
-      }
+    // set up 10 files that all request the full size.
+    MemoryManager.Callback ignore = newScale -> false;
+    for(int f=0; f < 9; ++f) {
+      memoryMgr.addWriter(new Path("file-" + f), POOL_SIZE, ignore);
     }
-  }
 
-  @Test
-  public void testMemoryManagementV11() throws Exception {
-    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
-
-    TypeDescription schema = createInnerSchema();
-    MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .setSchema(schema)
             .compress(CompressionKind.NONE)
-            .stripeSize(50000)
+            .stripeSize(POOL_SIZE)
             .bufferSize(100)
             .rowIndexStride(0)
-            .memory(memory)
+            .memory(memoryMgr)
             .version(fileFormat));
-    assertEquals(testFilePath, memory.path);
+    // check to make sure it is 10%
+    assertEquals(0.1, memoryMgr.getAllocationScale(), 0.001);
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1;
     for(int i=0; i < 2500; ++i) {
@@ -2404,56 +2378,17 @@ public class TestVectorOrcFile {
       writer.addRowBatch(batch);
     }
     writer.close();
-    assertEquals(null, memory.path);
+    assertEquals(0.111, memoryMgr.getAllocationScale(), 0.001);
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
     int i = 0;
     for(StripeInformation stripe: reader.getStripes()) {
       i += 1;
       assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
-          stripe.getDataLength() < 5000);
-    }
-    assertEquals(25, i);
-    assertEquals(2500, reader.getNumberOfRows());
-  }
-
-  @Test
-  public void testMemoryManagementV12() throws Exception {
-    Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
-    TypeDescription schema = createInnerSchema();
-    MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
-    Writer writer = OrcFile.createWriter(testFilePath,
-                                         OrcFile.writerOptions(conf)
-                                         .setSchema(schema)
-                                         .compress(CompressionKind.NONE)
-                                         .stripeSize(50000)
-                                         .bufferSize(100)
-                                         .rowIndexStride(0)
-                                         .memory(memory)
-                                         .version(fileFormat));
-    VectorizedRowBatch batch = schema.createRowBatch();
-    assertEquals(testFilePath, memory.path);
-    batch.size = 1;
-    for(int i=0; i < 2500; ++i) {
-      ((LongColumnVector) batch.cols[0]).vector[0] = i * 300;
-      ((BytesColumnVector) batch.cols[1]).setVal(0,
-          Integer.toHexString(10*i).getBytes());
-      writer.addRowBatch(batch);
+          stripe.getDataLength() < POOL_SIZE);
     }
-    writer.close();
-    assertEquals(null, memory.path);
-    Reader reader = OrcFile.createReader(testFilePath,
-        OrcFile.readerOptions(conf).filesystem(fs));
-    int i = 0;
-    for(StripeInformation stripe: reader.getStripes()) {
-      i += 1;
-      assertTrue(testFilePath + " stripe " + i + " is too long at " +
-              stripe.getDataLength(), stripe.getDataLength() < 5000);
-    }
-    // with HIVE-7832, the dictionaries will be disabled after writing the first
-    // stripe as there are too many distinct values. Hence only 3 stripes as
-    // compared to 25 stripes in version 0.11 (above test case)
-    assertEquals(3, i);
+    // 0.11 always uses the dictionary, so ends up with a lot more stripes
+    assertEquals(fileFormat == OrcFile.Version.V_0_11 ? 25 : 3, i);
     assertEquals(2500, reader.getNumberOfRows());
   }
 
diff --git a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
index 109b95e..dab59de 100644
--- a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
+++ b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
@@ -79,18 +79,12 @@ public class TestMemoryManager {
     Configuration conf = new Configuration();
     conf.set("hive.exec.orc.memory.pool", "0.9");
     MemoryManagerImpl mgr = new MemoryManagerImpl(conf);
-    assertEquals("Wrong default ",
-      OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf), mgr.ROWS_BETWEEN_CHECKS);
     long mem =
         ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
     System.err.print("Memory = " + mem);
     long pool = mgr.getTotalMemoryPool();
     assertTrue("Pool too small: " + pool, mem * 0.899 < pool);
     assertTrue("Pool too big: " + pool, pool < mem * 0.901);
-
-    conf.setLong(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 1234);
-    mgr = new MemoryManagerImpl(conf);
-    assertEquals("Wrong default ", 1234, mgr.ROWS_BETWEEN_CHECKS);
   }
 
   private static class DoubleMatcher extends BaseMatcher<Double> {
@@ -128,12 +122,12 @@ public class TestMemoryManager {
       calls[i] = Mockito.mock(MemoryManager.Callback.class);
       mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
     }
-    // add enough rows to get the memory manager to check the limits
-    for(int i=0; i < 10000; ++i) {
-      mgr.addedRow(1);
+    // check to make sure that they get scaled down
+    for(int i=0; i < calls.length; ++i) {
+      mgr.checkMemory(0, calls[i]);
     }
     for(int call=0; call < calls.length; ++call) {
-      Mockito.verify(calls[call], Mockito.times(2))
+      Mockito.verify(calls[call])
           .checkMemory(Matchers.doubleThat(closeTo(0.2, ERROR)));
     }
   }
diff --git a/java/pom.xml b/java/pom.xml
index 1ae2b46..0e50cb6 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -268,8 +268,8 @@
           <artifactId>maven-compiler-plugin</artifactId>
           <version>3.1</version>
           <configuration>
-            <source>1.7</source>
-            <target>1.7</target>
+            <source>1.8</source>
+            <target>1.8</target>
           </configuration>
         </plugin>
         <plugin>


Mime
View raw message