giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 0a82257
Date Wed, 10 Aug 2016 21:00:26 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk a85c41e30 -> 0a8225712


GIRAPH-1104: NegativeArraySize exception in BigDataOutput

Summary:
BigDataIO is not properly handling large byte[] being written to it. Chunk them up when needed
to respect the max single data output size.
With D61791 job was still failing with the same exception.

Test Plan: The job which was failing because of large edges now works, added a test

Differential Revision: https://reviews.facebook.net/D61839


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0a822571
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0a822571
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0a822571

Branch: refs/heads/trunk
Commit: 0a8225712e4b4a4471c5efb1ca11e97e2ddbd71e
Parents: a85c41e
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Wed Aug 10 12:56:19 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Wed Aug 10 12:58:38 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/utils/io/BigDataInput.java    | 13 +++--
 .../apache/giraph/utils/io/BigDataOutput.java   | 17 +++++--
 .../apache/giraph/utils/io/TestBigDataIO.java   | 53 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/0a822571/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
index 2454a37..c8251b1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
@@ -83,14 +83,21 @@ public class BigDataInput implements ExtendedDataInput {
 
   @Override
   public void readFully(byte[] b) throws IOException {
-    checkIfShouldMoveToNextDataInput();
-    currentInput.readFully(b);
+    readFully(b, 0, b.length);
   }
 
   @Override
   public void readFully(byte[] b, int off, int len) throws IOException {
     checkIfShouldMoveToNextDataInput();
-    currentInput.readFully(b, off, len);
+    int available = currentInput.available();
+    if (len <= available) {
+      currentInput.readFully(b, off, len);
+    } else {
+      // When we are trying to read more bytes than there are in single chunk
+      // we need to read part by part
+      currentInput.readFully(b, off, available);
+      readFully(b, off + available, len - available);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/0a822571/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
index 094e4a5..f5637be 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -115,7 +115,7 @@ public class BigDataOutput implements DataOutput, Writable {
    * @return DataOutput which data should be written to
    */
   private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
-    if (currentDataOutput.getPos() + additionalSize >= getMaxSize()) {
+    if (currentDataOutput.getPos() + additionalSize > getMaxSize()) {
       if (dataOutputs == null) {
         dataOutputs = new ArrayList<>(1);
       }
@@ -175,12 +175,23 @@ public class BigDataOutput implements DataOutput, Writable {
 
   @Override
   public void write(byte[] b) throws IOException {
-    getDataOutputToWriteTo(b.length + SIZE_DELTA).write(b);
+    write(b, 0, b.length);
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
-    getDataOutputToWriteTo(len + SIZE_DELTA).write(b, off, len);
+    if (len <= getMaxSize()) {
+      getDataOutputToWriteTo(len).write(b, off, len);
+    } else {
+      // When we try to write more bytes than the biggest size of single data
+      // output, we need to split up the byte array into multiple chunks
+      while (len > 0) {
+        int toWrite = Math.min(getMaxSize(), len);
+        write(b, off, toWrite);
+        len -= toWrite;
+        off += toWrite;
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/0a822571/giraph-core/src/test/java/org/apache/giraph/utils/io/TestBigDataIO.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/io/TestBigDataIO.java b/giraph-core/src/test/java/org/apache/giraph/utils/io/TestBigDataIO.java
new file mode 100644
index 0000000..19e48fa
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/io/TestBigDataIO.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestBigDataIO {
+  @Test
+  public void testLargeByteArrays() throws IOException {
+    ImmutableClassesGiraphConfiguration conf =
+        new ImmutableClassesGiraphConfiguration(new Configuration());
+    BigDataOutput output = new BigDataOutput(conf);
+    byte[] byteArray1 = new byte[(1 << 28) + 3];
+    int pos1 = (1 << 27) + 2;
+    byteArray1[pos1] = 17;
+    output.write(byteArray1);
+    Assert.assertEquals(9, output.getNumberOfDataOutputs());
+    byte[] byteArray2 = new byte[(1 << 27) - 1];
+    int pos2 = (1 << 26) + 5;
+    byteArray2[pos2] = 13;
+    output.write(byteArray2);
+    Assert.assertEquals(13, output.getNumberOfDataOutputs());
+
+    BigDataInput in = new BigDataInput(output);
+    byte[] byteArray3 = new byte[byteArray1.length];
+    in.readFully(byteArray3);
+    Assert.assertEquals(byteArray1[pos1], byteArray3[pos1]);
+    byte[] byteArray4 = new byte[byteArray2.length];
+    in.readFully(byteArray4);
+    Assert.assertEquals(byteArray2[pos2], byteArray4[pos2]);
+  }
+}


Mime
View raw message