beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Some minor changes and fixes for sorter module
Date Tue, 13 Dec 2016 23:55:40 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0bdf7fc04 -> a5f2df2bd


Some minor changes and fixes for sorter module

Includes:

* Limit max memory for ExternalSorter and BufferedExternalSorter to 2047 MB to prevent int
overflow within Hadoop's sorting library
* Fix int overflow for large memory values in InMemorySorter
* Add note about estimated disk use to README.MD
* Fix to make Hadoop's sorting library put all temp files under the specified directory
* Have Hadoop clean up the temp directory on exit
* Stop shading hadoop dependencies. Some context:
** The existing shading is broken (modules that depend on this one cannot use it successfully).
** Hadoop's use of reflection in several instances makes shading the dependency "in a good
way" nearly impossible. It requires a couple of rather brittle hacks, and, for clients that
depend on certain conflicting versions of hadoop these hacks can mean it doesn't meet its
intended goal of preventing conflicts anyway.
** From what I can tell, there's no good way to shade this to make it universally usable,
so leaving it unshaded seems like a reasonable default.
** Without shading Hadoop, this module can be successfully used from Beam's wordcount example
(which actually does have pre-existing hadoop dependencies already).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a04492e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a04492e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a04492e

Branch: refs/heads/master
Commit: 5a04492e5b7c5d5b4cb2da0f7a80ed8f0c2f2eb4
Parents: 0bdf7fc
Author: Mitch Shanklin <mshanklin@google.com>
Authored: Wed Nov 9 14:09:49 2016 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Dec 13 15:42:10 2016 -0800

----------------------------------------------------------------------
 sdks/java/extensions/sorter/README.md           |  2 +-
 sdks/java/extensions/sorter/pom.xml             |  8 ++----
 .../sorter/BufferedExternalSorter.java          |  6 ++++-
 .../sdk/extensions/sorter/ExternalSorter.java   | 15 ++++++++++-
 .../sdk/extensions/sorter/InMemorySorter.java   | 26 ++++++++++----------
 .../sorter/BufferedExternalSorterTest.java      | 16 ++++++++++++
 .../extensions/sorter/ExternalSorterTest.java   | 16 ++++++++++++
 .../extensions/sorter/InMemorySorterTest.java   |  8 ++++++
 8 files changed, 75 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/README.md b/sdks/java/extensions/sorter/README.md
index 18bd0d2..6ff3dbe 100644
--- a/sdks/java/extensions/sorter/README.md
+++ b/sdks/java/extensions/sorter/README.md
@@ -22,7 +22,7 @@ This module provides the SortValues transform, which takes a `PCollection<KV<K,
 
 ##Caveats
 * This transform performs value-only sorting; the iterable accompanying each key is sorted,
but *there is no relationship between different keys*, as Beam does not support any defined
relationship between different elements in a PCollection.
-* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory
and disk. This means that `SortValues` may be a performance and/or scalability bottleneck
when used in different pipelines. For example, users are discouraged from using `SortValues`
on a `PCollection` of a single element to globally sort a large `PCollection`.
+* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory
and disk. This means that `SortValues` may be a performance and/or scalability bottleneck
when used in different pipelines. For example, users are discouraged from using `SortValues`
on a `PCollection` of a single element to globally sort a large `PCollection`. A (rough) estimate
of the number of bytes of disk space utilized if sorting spills to disk is `numRecords * (numSecondaryKeyBytesPerRecord
+ numValueBytesPerRecord + 16) * 3`.
 
 ##Options
 * The user can customize the temporary location used if sorting requires spilling to disk
and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options`
to pass into `SortValues.create`.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index a99a793..c8dfd52 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -69,8 +69,6 @@
               <shadeTestJar>true</shadeTestJar>
               <artifactSet>
                 <includes>
-                  <include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
-                  <include>org.apache.hadoop:hadoop-common</include>
                   <include>com.google.guava:guava</include>
                 </includes>
               </artifactSet>
@@ -86,10 +84,6 @@
               </filters>
               <relocations>
                 <relocation>
-                  <pattern>org.apache.hadoop</pattern>
-                  <shadedPattern>org.apache.beam.repackaged.org.apache.hadoop</shadedPattern>
-                </relocation>
-                <relocation>
                   <pattern>com.google.common</pattern>
                   <shadedPattern>org.apache.beam.repackaged.com.google.common</shadedPattern>
                 </relocation>
@@ -117,12 +111,14 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${hadoop.version}</version>
+      <scope>provided</scope>
     </dependency>
     
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
+      <scope>provided</scope>
     </dependency>
     
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
index 1dfd339..1a16511 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -51,10 +51,14 @@ public class BufferedExternalSorter implements Sorter {
 
     /**
      * Sets the size of the memory buffer in megabytes. This controls both the buffer for
initial in
-     * memory sorting and the buffer used when external sorting. Must be greater than zero.
+     * memory sorting and the buffer used when external sorting. Must be greater than zero
and less
+     * than 2048.
      */
     public Options setMemoryMB(int memoryMB) {
       checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
+      // Hadoop's external sort stores the number of available memory bytes in an int, this
prevents
+      // overflow
+      checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
       this.memoryMB = memoryMB;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
index beef1ee..492efba 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
@@ -81,9 +81,15 @@ class ExternalSorter implements Sorter {
       return tempLocation;
     }
 
-    /** Sets the size of the memory buffer in megabytes. */
+    /**
+     * Sets the size of the memory buffer in megabytes. Must be greater than zero and less
than
+     * 2048.
+     */
     public Options setMemoryMB(int memoryMB) {
       checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
+      // Hadoop's external sort stores the number of available memory bytes in an int, this
prevents
+      // integer overflow
+      checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
       this.memoryMB = memoryMB;
       return this;
     }
@@ -137,6 +143,9 @@ class ExternalSorter implements Sorter {
       paths = new Path[] {new Path(tempDir, "test.seq")};
 
       JobConf conf = new JobConf();
+      // Sets directory for intermediate files created during merge of merge sort
+      conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());
+
       writer =
           SequenceFile.createWriter(
               conf,
@@ -146,6 +155,10 @@ class ExternalSorter implements Sorter {
               Writer.compression(CompressionType.NONE));
 
       FileSystem fs = FileSystem.getLocal(conf);
+      // Directory has to exist for Hadoop to recognize it as deletable on exit
+      fs.mkdirs(tempDir);
+      fs.deleteOnExit(tempDir);
+
       sorter =
           new SequenceFile.Sorter(
               fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class,
conf);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
index 9203130..8c635f0 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
@@ -35,16 +35,16 @@ import org.apache.beam.sdk.values.KV;
 class InMemorySorter implements Sorter {
   /** {@code Options} contains configuration of the sorter. */
   public static class Options implements Serializable {
-    private int memoryMB = 100;
+    private long memoryMB = 100;
 
     /** Sets the size of the memory buffer in megabytes. */
-    public void setMemoryMB(int memoryMB) {
+    public void setMemoryMB(long memoryMB) {
       checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
       this.memoryMB = memoryMB;
     }
 
     /** Returns the configured size of the memory buffer. */
-    public int getMemoryMB() {
+    public long getMemoryMB() {
       return memoryMB;
     }
   }
@@ -53,7 +53,7 @@ class InMemorySorter implements Sorter {
   private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
 
   /** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */
-  private static final int NUM_BYTES_PER_WORD = getNumBytesPerWord();
+  private static final long NUM_BYTES_PER_WORD = getNumBytesPerWord();
 
   /**
    * Estimate of memory overhead per KV record in bytes not including memory associated with
keys
@@ -66,13 +66,13 @@ class InMemorySorter implements Sorter {
    *   <li> Per-object overhead (JVM-specific, guessing 2 words * 3 objects)
    * </ul>
    */
-  private static final int RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
+  private static final long RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
 
   /** Maximum size of the buffer in bytes. */
-  private int maxBufferSize;
+  private long maxBufferSize;
 
   /** Current number of stored bytes. Including estimated overhead bytes. */
-  private int numBytes;
+  private long numBytes;
 
   /** Whether sort has been called. */
   private boolean sortCalled;
@@ -82,7 +82,7 @@ class InMemorySorter implements Sorter {
 
   /** Private constructor. */
   private InMemorySorter(Options options) {
-    maxBufferSize = options.getMemoryMB() * 1024 * 1024;
+    maxBufferSize = options.getMemoryMB() * 1024L * 1024L;
   }
 
   /** Create a new sorter from provided options. */
@@ -99,7 +99,7 @@ class InMemorySorter implements Sorter {
   public boolean addIfRoom(KV<byte[], byte[]> record) {
     checkState(!sortCalled, "Records can only be added before sort()");
 
-    int recordBytes = estimateRecordBytes(record);
+    long recordBytes = estimateRecordBytes(record);
     if (roomInBuffer(numBytes + recordBytes, records.size() + 1)) {
       records.add(record);
       numBytes += recordBytes;
@@ -131,7 +131,7 @@ class InMemorySorter implements Sorter {
    * Estimate the number of additional bytes required to store this record. Including the
key, the
    * value and any overhead for objects and references.
    */
-  private int estimateRecordBytes(KV<byte[], byte[]> record) {
+  private long estimateRecordBytes(KV<byte[], byte[]> record) {
     return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length;
   }
 
@@ -139,7 +139,7 @@ class InMemorySorter implements Sorter {
    * Check whether we have room to store the provided total number of bytes and total number
of
    * records.
    */
-  private boolean roomInBuffer(int numBytes, int numRecords) {
+  private boolean roomInBuffer(long numBytes, long numRecords) {
     // Collections.sort may allocate up to n/2 extra object references.
     // Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object
     // references in the backing array.
@@ -153,11 +153,11 @@ class InMemorySorter implements Sorter {
    * Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit
if answer
    * unknown.
    */
-  private static int getNumBytesPerWord() {
+  private static long getNumBytesPerWord() {
     String bitsPerWord = System.getProperty("sun.arch.data.model");
 
     try {
-      return Integer.parseInt(bitsPerWord) / 8;
+      return Long.parseLong(bitsPerWord) / 8;
     } catch (Exception e) {
       // Can't determine whether 32 or 64 bit, so assume 64
       return 8;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
index 8c108eb..3d63b1a 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
@@ -214,4 +214,20 @@ public class BufferedExternalSorterTest {
         .setTempLocation(tmpLocation.toString());
     options.setMemoryMB(-1);
   }
+
+  @Test
+  public void testZeroMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
+    options.setMemoryMB(0);
+  }
+
+  @Test
+  public void testMemoryTooLarge() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be less than 2048");
+    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
+    options.setMemoryMB(2048);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
index bcfbdad..689dbff 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
@@ -123,4 +123,20 @@ public class ExternalSorterTest {
     ExternalSorter.Options options = new ExternalSorter.Options();
     options.setMemoryMB(-1);
   }
+
+  @Test
+  public void testZeroMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    ExternalSorter.Options options = new ExternalSorter.Options();
+    options.setMemoryMB(0);
+  }
+
+  @Test
+  public void testMemoryTooLarge() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be less than 2048");
+    ExternalSorter.Options options = new ExternalSorter.Options();
+    options.setMemoryMB(2048);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a04492e/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
index 867390b..f86f8f5 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
@@ -141,4 +141,12 @@ public class InMemorySorterTest {
     InMemorySorter.Options options = new InMemorySorter.Options();
     options.setMemoryMB(-1);
   }
+
+  @Test
+  public void testZeroMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    InMemorySorter.Options options = new InMemorySorter.Options();
+    options.setMemoryMB(0);
+  }
 }


Mime
View raw message