nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 35/40: make sailfish to read data in partition level
Date Fri, 06 Apr 2018 02:36:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 3dff237e43f6cbe3ecf1728c97042f9da44b163d
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Fri Mar 23 15:48:59 2018 +0900

    make sailfish to read data in partition level
---
 .../runtime/executor/data/BlockManagerWorker.java  |  40 ++++----
 .../runtime/executor/data/block/FileBlock.java     | 111 +++++++++++++++++++++
 2 files changed, 134 insertions(+), 17 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ded05ce..66628db 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -162,26 +162,32 @@ public final class BlockManagerWorker {
     final Optional<Block> optionalBlock = store.readBlock(blockId);
 
     if (optionalBlock.isPresent()) {
-      final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
-      handleUsedData(blockStore, blockId);
-
-      // Block resides in this evaluator!
-      try {
-        final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
-        long numSerializedBytes = 0;
-        long numEncodedBytes = 0;
+      if (blockStore.equals(DataStoreProperty.Value.LocalFileStore)) {
+        final DataUtil.IteratorWithNumBytes itr = ((FileBlock) optionalBlock.get()).readLazily(keyRange);
+        handleUsedData(blockStore, blockId);
+        return CompletableFuture.completedFuture(itr);
+      } else {
+        final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
+        handleUsedData(blockStore, blockId);
+
+        // Block resides in this evaluator!
         try {
-          for (final NonSerializedPartition partition : partitions) {
-            numSerializedBytes += partition.getNumSerializedBytes();
-            numEncodedBytes += partition.getNumEncodedBytes();
+          final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
+          long numSerializedBytes = 0;
+          long numEncodedBytes = 0;
+          try {
+            for (final NonSerializedPartition partition : partitions) {
+              numSerializedBytes += partition.getNumSerializedBytes();
+              numEncodedBytes += partition.getNumEncodedBytes();
+            }
+            return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
numSerializedBytes,
+                numEncodedBytes));
+          } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+            return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
           }
-          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
numSerializedBytes,
-              numEncodedBytes));
-        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
+        } catch (final IOException e) {
+          throw new BlockFetchException(e);
         }
-      } catch (final IOException e) {
-        throw new BlockFetchException(e);
       }
     } else {
       // We don't have the block here...
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 51fbdef..02046d7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -158,6 +158,117 @@ public final class FileBlock<K extends Serializable> extends AbstractBlock<K>
{
     }
   }
 
+  private class LazyIterator implements Iterator, AutoCloseable, DataUtil.IteratorWithNumBytes
{
+
+    final KeyRange keyRange;
+    final FileInputStream fileInputStream;
+    final Iterator<PartitionMetadata<K>> partitionMetadataItr;
+    final Serializer serializerToUse;
+    int readablePartitions;
+    Iterator currentIterator;
+    long numSerializedBytes;
+
+    private LazyIterator(final KeyRange keyRange) throws BlockFetchException {
+      try {
+        this.keyRange = keyRange;
+        this.fileInputStream = new FileInputStream(filePath);
+        this.partitionMetadataItr = metadata.getPartitionMetadataList().iterator();
+        this.readablePartitions = metadata.getPartitionMetadataList().size();
+        this.serializerToUse = metadata.isReadAsBytes()
+            ? SerializerManager.getAsBytesSerializer() : getSerializer();
+        this.currentIterator = null;
+        this.numSerializedBytes = 0;
+      } catch (final IOException e) {
+        throw new BlockFetchException(e);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        if (currentIterator == null) {
+          while (readablePartitions > 0) {
+            readablePartitions--;
+            final PartitionMetadata<K> pMetadata = partitionMetadataItr.next();
+            final K key = pMetadata.getKey();
+            if (keyRange.includes(key)) {
+              // The key value of this partition is in the range.
+              final long availableBefore = fileInputStream.available();
+              // We need to limit read bytes on this FileStream, which could be over-read
by wrapped
+              // compression stream. We recommend to wrap with LimitedInputStream once more
when
+              // reading input from chained compression InputStream.
+              // Plus, this stream must be not closed to prevent to close the filtered file
partition.
+              final int length = pMetadata.getPartitionSize();
+              this.numSerializedBytes += length;
+              final LimitedInputStream limitedInputStream = new LimitedInputStream(fileInputStream,
length);
+              final NonSerializedPartition<K> deserializePartition =
+                  DataUtil.deserializePartition(length, serializerToUse, key, limitedInputStream);
+              // rearrange file pointer
+              final long toSkip = pMetadata.getPartitionSize() - availableBefore + fileInputStream.available();
+              if (toSkip > 0) {
+                skipBytes(fileInputStream, toSkip);
+              } else if (toSkip < 0) {
+                throw new IOException("file stream has been overread");
+              }
+              currentIterator = deserializePartition.getData().iterator();
+              if (currentIterator.hasNext()) {
+                return true;
+              } else {
+                currentIterator = null;
+              }
+            } else {
+              // Have to skip this partition.
+              skipBytes(fileInputStream, pMetadata.getPartitionSize());
+            }
+          }
+          return false;
+        } else {
+          return true;
+        }
+      } catch (final IOException e) {
+        throw new BlockFetchException(e);
+      }
+    }
+
+    @Override
+    public Object next() {
+      if (hasNext()) {
+        final Object value = currentIterator.next();
+        if (!currentIterator.hasNext()) {
+          currentIterator = null;
+        }
+        return value;
+      } else {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      fileInputStream.close();
+    }
+
+    @Override
+    public long getNumSerializedBytes() throws NumBytesNotSupportedException {
+      return numSerializedBytes;
+    }
+
+    @Override
+    public long getNumEncodedBytes() throws NumBytesNotSupportedException {
+      throw new NumBytesNotSupportedException();
+    }
+
+  }
+
+  public DataUtil.IteratorWithNumBytes readLazily(final KeyRange keyRange) throws BlockFetchException
{
+    // TODO: handle used data?
+    if (!metadata.isCommitted()) {
+      throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block
is committed"));
+    } else {
+      return new LazyIterator(keyRange);
+    }
+  }
+
   /**
    * Retrieves the partitions of this block from the file in a specific key range and deserializes
it.
    *

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message