nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 40/40: revert partition level read
Date Fri, 06 Apr 2018 02:36:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit b4af16ef99daecdf7a81094a8d285a01dba5132e
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Sun Mar 25 01:27:56 2018 +0900

    revert partition level read
---
 .../runtime/executor/data/BlockManagerWorker.java  |  40 ++++----
 .../runtime/executor/data/block/FileBlock.java     | 113 ---------------------
 2 files changed, 17 insertions(+), 136 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ac2d209..ded05ce 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -162,32 +162,26 @@ public final class BlockManagerWorker {
     final Optional<Block> optionalBlock = store.readBlock(blockId);
 
     if (optionalBlock.isPresent()) {
-      if (blockStore.equals(DataStoreProperty.Value.LocalFileStore)) {
-        final DataUtil.IteratorWithNumBytes itr = ((FileBlock) optionalBlock.get()).readLazily(keyRange);
-        handleUsedData(blockStore, blockId); // TODO #?: okay? condition?
-        return CompletableFuture.completedFuture(itr);
-      } else {
-        final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
-        handleUsedData(blockStore, blockId);
-
-        // Block resides in this evaluator!
+      final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
+      handleUsedData(blockStore, blockId);
+
+      // Block resides in this evaluator!
+      try {
+        final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
+        long numSerializedBytes = 0;
+        long numEncodedBytes = 0;
         try {
-          final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
-          long numSerializedBytes = 0;
-          long numEncodedBytes = 0;
-          try {
-            for (final NonSerializedPartition partition : partitions) {
-              numSerializedBytes += partition.getNumSerializedBytes();
-              numEncodedBytes += partition.getNumEncodedBytes();
-            }
-            return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
numSerializedBytes,
-                numEncodedBytes));
-          } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-            return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
+          for (final NonSerializedPartition partition : partitions) {
+            numSerializedBytes += partition.getNumSerializedBytes();
+            numEncodedBytes += partition.getNumEncodedBytes();
           }
-        } catch (final IOException e) {
-          throw new BlockFetchException(e);
+          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
numSerializedBytes,
+              numEncodedBytes));
+        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
         }
+      } catch (final IOException e) {
+        throw new BlockFetchException(e);
       }
     } else {
       // We don't have the block here...
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index a901ed3..51fbdef 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -159,119 +159,6 @@ public final class FileBlock<K extends Serializable> extends AbstractBlock<K>
{
   }
 
   /**
-   * WOW.
-   */
-  private final class LazyIterator implements Iterator, AutoCloseable, DataUtil.IteratorWithNumBytes
{
-
-    private final KeyRange keyRange;
-    private final FileInputStream fileInputStream;
-    private final Iterator<PartitionMetadata<K>> partitionMetadataItr;
-    private final Serializer serializerToUse;
-    private int readablePartitions;
-    private Iterator currentIterator;
-    private long numSerializedBytes;
-
-    private LazyIterator(final KeyRange keyRange) throws BlockFetchException {
-      try {
-        this.keyRange = keyRange;
-        this.fileInputStream = new FileInputStream(filePath);
-        this.partitionMetadataItr = metadata.getPartitionMetadataList().iterator();
-        this.readablePartitions = metadata.getPartitionMetadataList().size();
-        this.serializerToUse = metadata.isReadAsBytes()
-            ? SerializerManager.getAsBytesSerializer() : getSerializer();
-        this.currentIterator = null;
-        this.numSerializedBytes = 0;
-      } catch (final IOException e) {
-        throw new BlockFetchException(e);
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      try {
-        if (currentIterator == null) {
-          while (readablePartitions > 0) {
-            readablePartitions--;
-            final PartitionMetadata<K> pMetadata = partitionMetadataItr.next();
-            final K key = pMetadata.getKey();
-            if (keyRange.includes(key)) {
-              // The key value of this partition is in the range.
-              final long availableBefore = fileInputStream.available();
-              // We need to limit read bytes on this FileStream, which could be over-read
by wrapped
-              // compression stream. We recommend to wrap with LimitedInputStream once more
when
-              // reading input from chained compression InputStream.
-              // Plus, this stream must be not closed to prevent to close the filtered file
partition.
-              final int length = pMetadata.getPartitionSize();
-              this.numSerializedBytes += length;
-              final LimitedInputStream limitedInputStream = new LimitedInputStream(fileInputStream,
length);
-              final NonSerializedPartition<K> deserializePartition =
-                  DataUtil.deserializePartition(length, serializerToUse, key, limitedInputStream);
-              // rearrange file pointer
-              final long toSkip = pMetadata.getPartitionSize() - availableBefore + fileInputStream.available();
-              if (toSkip > 0) {
-                skipBytes(fileInputStream, toSkip);
-              } else if (toSkip < 0) {
-                throw new IOException("file stream has been overread");
-              }
-              currentIterator = deserializePartition.getData().iterator();
-              if (currentIterator.hasNext()) {
-                return true;
-              } else {
-                currentIterator = null;
-              }
-            } else {
-              // Have to skip this partition.
-              skipBytes(fileInputStream, pMetadata.getPartitionSize());
-            }
-          }
-          return false;
-        } else {
-          return true;
-        }
-      } catch (final IOException e) {
-        throw new BlockFetchException(e);
-      }
-    }
-
-    @Override
-    public Object next() {
-      if (hasNext()) {
-        final Object value = currentIterator.next();
-        if (!currentIterator.hasNext()) {
-          currentIterator = null;
-        }
-        return value;
-      } else {
-        throw new NoSuchElementException();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      fileInputStream.close();
-    }
-
-    @Override
-    public long getNumSerializedBytes() throws NumBytesNotSupportedException {
-      return numSerializedBytes;
-    }
-
-    @Override
-    public long getNumEncodedBytes() throws NumBytesNotSupportedException {
-      throw new NumBytesNotSupportedException();
-    }
-
-  }
-
-  public DataUtil.IteratorWithNumBytes readLazily(final KeyRange keyRange) throws BlockFetchException
{
-    if (!metadata.isCommitted()) {
-      throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block
is committed"));
-    } else {
-      return new LazyIterator(keyRange);
-    }
-  }
-
-  /**
    * Retrieves the partitions of this block from the file in a specific key range and deserializes
it.
    *
    * @param keyRange the key range.

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message