giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pava...@apache.org
Subject git commit: updated refs/heads/trunk to 58576c8
Date Wed, 18 Jun 2014 22:38:41 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk ffdddff32 -> 58576c81f


GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/58576c81
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/58576c81
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/58576c81

Branch: refs/heads/trunk
Commit: 58576c81fed9f4eccb03366151c1cf280765d237
Parents: ffdddff
Author: Pavan Kumar <pavanka@fb.com>
Authored: Wed Jun 18 15:38:30 2014 -0700
Committer: Pavan Kumar <pavanka@fb.com>
Committed: Wed Jun 18 15:38:30 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../SendWorkerOneToAllMessagesRequest.java      |  2 +-
 .../org/apache/giraph/edge/ByteArrayEdges.java  |  2 +-
 .../apache/giraph/utils/ByteStructIterator.java |  2 +-
 .../utils/ByteStructVertexIdIterator.java       |  2 +-
 .../utils/ExtendedByteArrayDataInput.java       |  5 ++++
 .../apache/giraph/utils/ExtendedDataInput.java  |  7 +++++
 .../apache/giraph/utils/UnsafeArrayReads.java   |  5 ++++
 .../org/apache/giraph/utils/VertexIterator.java |  2 +-
 .../apache/giraph/utils/io/BigDataInput.java    | 31 +++++++++++++-------
 10 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index f35d4ba..659edfd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka)
+
   GIRAPH-918: GIRAPH-908 has a small bug reg counting entries (pavanka)
 
   GIRAPH-842: option to dump histogram of memory usage when heap is low on memory (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
index 8745adb..5f1ed53 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
@@ -116,7 +116,7 @@ public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable,
     int idCount = 0;
     int partitionId = 0;
     try {
-      while (reader.available() != 0) {
+      while (!reader.endOfInput()) {
         msg.readFields(reader);
         idCount = reader.readInt();
         for (int i = 0; i < idCount; i++) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
index 271e9c5..509546c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
@@ -159,7 +159,7 @@ public class ByteArrayEdges<I extends WritableComparable, E extends
Writable>
 
     @Override
     public boolean hasNext() {
-      return serializedEdges != null && extendedDataInput.available() > 0;
+      return serializedEdges != null && !extendedDataInput.endOfInput();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
index 322365c..1f1b90e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
@@ -44,7 +44,7 @@ public abstract class ByteStructIterator<T extends Writable> implements
 
   @Override
   public boolean hasNext() {
-    return extendedDataInput.available() > 0;
+    return !extendedDataInput.endOfInput();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
index 3d564cd..3b880a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
@@ -54,7 +54,7 @@ public abstract class ByteStructVertexIdIterator<I extends WritableComparable>
 
   @Override
   public boolean hasNext() {
-    return extendedDataInput.available() > 0;
+    return !extendedDataInput.endOfInput();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
index 3eae25b..56c79c4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -57,6 +57,11 @@ public class ExtendedByteArrayDataInput extends ByteArrayInputStream
   }
 
   @Override
+  public boolean endOfInput() {
+    return available() == 0;
+  }
+
+  @Override
   public void readFully(byte[] b) throws IOException {
     dataInput.readFully(b);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
index f1c6809..96096ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
@@ -36,4 +36,11 @@ public interface ExtendedDataInput extends DataInput {
    * @return Bytes available
    */
   int available();
+
+  /**
+   * Check if we read everything from the input
+   *
+   * @return True iff we read everything from the input
+   */
+  boolean endOfInput();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
index db19fda..1ab8de6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -84,6 +84,11 @@ public class UnsafeArrayReads extends UnsafeReads {
     return (int) (bufLength - pos);
   }
 
+  @Override
+  public boolean endOfInput() {
+    return available() == 0;
+  }
+
 
   @Override
   public int getPos() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
index dced9bd..dd73b1f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
@@ -77,7 +77,7 @@ public class VertexIterator<I extends WritableComparable,
    * @return True if the iteration has more elements.
    */
   public boolean hasNext() {
-    return extendedDataInput.available() > 0;
+    return !extendedDataInput.endOfInput();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
index f73819a..2454a37 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
@@ -76,7 +76,7 @@ public class BigDataInput implements ExtendedDataInput {
    * next one if needed.
    */
   private void checkIfShouldMoveToNextDataInput() {
-    if (currentInput.available() == 0) {
+    if (currentInput.endOfInput()) {
       moveToNextDataInput();
     }
   }
@@ -168,12 +168,17 @@ public class BigDataInput implements ExtendedDataInput {
   @Override
   public int skipBytes(int n) throws IOException {
     int bytesLeftToSkip = n;
-    while (bytesLeftToSkip >= currentInput.available()) {
-      bytesLeftToSkip -= currentInput.available();
-      moveToNextDataInput();
+    while (bytesLeftToSkip > 0) {
+      int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
+      bytesLeftToSkip -= bytesSkipped;
+      if (bytesLeftToSkip > 0) {
+        moveToNextDataInput();
+        if (endOfInput()) {
+          break;
+        }
+      }
     }
-    int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
-    return n - bytesLeftToSkip + bytesSkipped;
+    return n - bytesLeftToSkip;
   }
 
   @Override
@@ -187,10 +192,14 @@ public class BigDataInput implements ExtendedDataInput {
 
   @Override
   public int available() {
-    int available = 0;
-    for (int i = currentPositionInInputs; i < dataInputs.size(); i++) {
-      available += dataInputs.get(i).available();
-    }
-    return available;
+    throw new UnsupportedOperationException("available: " +
+        "Not supported with BigDataIO because overflow can happen");
+  }
+
+  @Override
+  public boolean endOfInput() {
+    return currentInput == EMPTY_INPUT ||
+        (dataInputs.get(currentPositionInInputs).endOfInput() &&
+            currentPositionInInputs == dataInputs.size() - 1);
   }
 }


Mime
View raw message