nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 16/40: merge #27
Date Fri, 06 Apr 2018 02:35:56 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit e7c3d69b7db58b9972fe78bf2e5798ed2ab6951a
Merge: 26ad2a3 f5103a3
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Sun Mar 11 20:05:25 2018 +0900

    merge #27

 .../edu/snu/nemo/common/test/ExampleTestUtil.java  |  42 ++--
 .../reshaping/DataSkewReshapingPass.java           |   4 +-
 .../beam/AlternatingLeastSquareITCase.java         |   8 +-
 .../snu/nemo/examples/beam/BroadcastITCase.java    |   8 +-
 .../snu/nemo/examples/beam/MapReduceITCase.java    |   8 +-
 .../edu/snu/nemo/examples/spark/SparkITCase.java   |   8 +-
 .../runtime/common/optimizer/RuntimeOptimizer.java |  10 +-
 .../pass/runtime/DataSkewRuntimePass.java          |  52 ++--
 runtime/common/src/main/proto/ControlMessage.proto |   7 +-
 .../executor/bytetransfer/ByteOutputContext.java   |   2 +-
 .../runtime/executor/data/BlockManagerWorker.java  | 130 +++++-----
 .../snu/nemo/runtime/executor/data/DataUtil.java   |  13 +-
 .../runtime/executor/data/SerializedPartition.java |  84 -------
 .../nemo/runtime/executor/data/block/Block.java    |  74 ++++--
 .../runtime/executor/data/block/FileBlock.java     | 280 +++++++++++++++------
 .../data/block/NonSerializedMemoryBlock.java       | 131 +++++++---
 .../executor/data/block/SerializedMemoryBlock.java | 151 ++++++++---
 .../executor/data/metadata/FileMetadata.java       |  14 +-
 .../executor/data/metadata/RemoteFileMetadata.java |   2 +-
 .../{ => partition}/NonSerializedPartition.java    |  70 ++++--
 .../executor/data/{ => partition}/Partition.java   |  21 +-
 .../data/partition/SerializedPartition.java        | 175 +++++++++++++
 .../data/partitioner/DataSkewHashPartitioner.java  |  56 ++---
 .../executor/data/partitioner/HashPartitioner.java |  40 ++-
 .../data/partitioner/IntactPartitioner.java        |  17 +-
 .../executor/data/partitioner/Partitioner.java     |  20 +-
 .../executor/data/stores/AbstractBlockStore.java   |   8 +-
 .../runtime/executor/data/stores/BlockStore.java   | 115 +++------
 .../runtime/executor/data/stores/FileStore.java    |  36 ---
 .../executor/data/stores/GlusterFileStore.java     | 149 ++---------
 .../executor/data/stores/LocalBlockStore.java      | 104 +-------
 .../executor/data/stores/LocalFileStore.java       |  64 +++--
 .../runtime/executor/data/stores/MemoryStore.java  |  33 ++-
 .../executor/data/stores/RemoteFileStore.java      |   2 +-
 .../data/stores/SerializedMemoryStore.java         |  32 ++-
 .../runtime/executor/datatransfer/InputReader.java |   2 -
 .../executor/datatransfer/OutputWriter.java        | 186 +++++---------
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  23 +-
 .../pass/runtime/DataSkewRuntimePassTest.java      |  30 ++-
 .../runtime/executor/data/BlockStoreTest.java      | 100 ++++----
 .../executor/datatransfer/DataTransferTest.java    |   1 -
 41 files changed, 1260 insertions(+), 1052 deletions(-)

diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 174eb4e,be63883..d140c0e
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@@ -38,15 -48,18 +48,19 @@@ public final class FileBlock<K extends 
  
    /**
     * Constructor.
 +   * If write (or read) as bytes is enabled, data written to (read from) the block does
not (de)serialized.
     *
+    * @param blockId    the ID of this block.
     * @param serializer the {@link Serializer}.
     * @param filePath   the path of the file that this block will be stored.
 -   * @param metadata   the metadata for this block.
 +   * @param metadata   the metadata for this block.\
     */
-   public FileBlock(final Serializer serializer,
+   public FileBlock(final String blockId,
+                    final Serializer serializer,
                     final String filePath,
                     final FileMetadata<K> metadata) {
+     this.id = blockId;
+     this.nonCommittedPartitionsMap = new HashMap<>();
      this.serializer = serializer;
      this.filePath = filePath;
      this.metadata = metadata;
@@@ -74,18 -86,54 +87,55 @@@
    }
  
    /**
+    * Writes an element to non-committed block.
+    * Invariant: This should not be invoked after this block is committed.
+    * Invariant: This method does not support concurrent write.
+    *
+    * @param key     the key.
+    * @param element the element to write.
+    * @throws BlockWriteException for any error occurred while trying to write a block.
+    */
+   @Override
+   public void write(final K key,
+                     final Object element) throws BlockWriteException {
+     if (metadata.isCommitted()) {
+       throw new BlockWriteException(new Throwable("The partition is already committed!"));
+     } else {
+       try {
+         SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+         if (partition == null) {
+           partition = new SerializedPartition<>(key, serializer);
+           nonCommittedPartitionsMap.put(key, partition);
+         }
+         partition.write(element);
+       } catch (final IOException e) {
+         throw new BlockWriteException(e);
+       }
+     }
+   }
+ 
+   /**
     * Writes {@link NonSerializedPartition}s to this block.
+    * Invariant: This method does not support concurrent write.
     *
     * @param partitions the {@link NonSerializedPartition}s to write.
-    * @throws IOException if fail to write.
+    * @throws BlockWriteException for any error occurred while trying to write a block.
     */
    @Override
-   public Optional<List<Long>> putPartitions(final Iterable<NonSerializedPartition<K>>
partitions) throws IOException {
-     final Iterable<SerializedPartition<K>> convertedPartitions = metadata.isWriteAsBytes()
?
-         DataUtil.convertToSerPartitions(SerializerManager.getAsBytesSerializer(), partitions)
:
-         DataUtil.convertToSerPartitions(serializer, partitions);
- 
-     return Optional.of(putSerializedPartitions(convertedPartitions));
 -  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions)
 -      throws BlockWriteException {
++  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions)
throws BlockWriteException {
+     if (metadata.isCommitted()) {
+       throw new BlockWriteException(new Throwable("The partition is already committed!"));
+     } else {
+       try {
++        final Serializer serializerToUse = metadata.isWriteAsBytes() ?
++            SerializerManager.getAsBytesSerializer() : serializer; // TODO: just use byte[]
+         final Iterable<SerializedPartition<K>> convertedPartitions =
 -            DataUtil.convertToSerPartitions(serializer, partitions);
++            DataUtil.convertToSerPartitions(serializerToUse, partitions);
+         writeSerializedPartitions(convertedPartitions);
+       } catch (final IOException e) {
+         throw new BlockWriteException(e);
+       }
+     }
    }
  
    /**
@@@ -111,45 -162,51 +164,53 @@@
     *
     * @param keyRange the key range.
     * @return an iterable of {@link NonSerializedPartition}s.
-    * @throws IOException if failed to retrieve.
+    * @throws BlockFetchException for any error occurred while trying to fetch a block.
     */
    @Override
-   public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange keyRange)
throws IOException {
-     // Deserialize the data
-     final Serializer serializerToUse = metadata.isReadAsBytes() ?
-         SerializerManager.getAsBytesSerializer() : serializer;
-     final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
-     try (final FileInputStream fileStream = new FileInputStream(filePath)) {
-       for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataIterable())
{
-         final K key = partitionMetadata.getKey();
-         if (keyRange.includes(key)) {
-           // The key value of this partition is in the range.
-           final long availableBefore = fileStream.available();
-           // We need to limit read bytes on this FileStream, which could be over-read by
wrapped
-           // compression stream. We recommend to wrap with LimitedInputStream once more
when
-           // reading input from chained compression InputStream.
-           // Plus, this stream must be not closed to prevent to close the filtered file
partition.
-           final LimitedInputStream limitedInputStream =
-               new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
-           final NonSerializedPartition<K> deserializePartition =
-               DataUtil.deserializePartition(
-                   partitionMetadata.getElementsTotal(), serializerToUse, key, limitedInputStream);
-           deserializedPartitions.add(deserializePartition);
-           // rearrange file pointer
-           final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
-           if (toSkip > 0) {
-             skipBytes(fileStream, toSkip);
-           } else if (toSkip < 0) {
-             throw new IOException("file stream has been overread");
+   public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange)
throws BlockFetchException {
+     if (!metadata.isCommitted()) {
+       throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block
is committed"));
+     } else {
+       // Deserialize the data
+       final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
++      final Serializer serializerToUse = metadata.isReadAsBytes() ?
++          SerializerManager.getAsBytesSerializer() : serializer;
+       try {
+         try (final FileInputStream fileStream = new FileInputStream(filePath)) {
+           for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList())
{
+             final K key = partitionMetadata.getKey();
+             if (keyRange.includes(key)) {
+               // The key value of this partition is in the range.
+               final long availableBefore = fileStream.available();
+               // We need to limit read bytes on this FileStream, which could be over-read
by wrapped
+               // compression stream. We recommend to wrap with LimitedInputStream once more
when
+               // reading input from chained compression InputStream.
+               // Plus, this stream must be not closed to prevent to close the filtered file
partition.
+               final LimitedInputStream limitedInputStream =
+                   new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
+               final NonSerializedPartition<K> deserializePartition =
+                   DataUtil.deserializePartition(
 -                      partitionMetadata.getElementsTotal(), serializer, key, limitedInputStream);
++                      partitionMetadata.getElementsTotal(), serializerToUse, key, limitedInputStream);
// TODO: byte[]
+               deserializedPartitions.add(deserializePartition);
+               // rearrange file pointer
+               final long toSkip = partitionMetadata.getPartitionSize() - availableBefore
+ fileStream.available();
+               if (toSkip > 0) {
+                 skipBytes(fileStream, toSkip);
+               } else if (toSkip < 0) {
+                 throw new IOException("file stream has been overread");
+               }
+             } else {
+               // Have to skip this partition.
+               skipBytes(fileStream, partitionMetadata.getPartitionSize());
+             }
            }
-         } else {
-           // Have to skip this partition.
-           skipBytes(fileStream, partitionMetadata.getPartitionSize());
          }
+       } catch (final IOException e) {
+         throw new BlockFetchException(e);
        }
-     }
  
-     return deserializedPartitions;
+       return deserializedPartitions;
+     }
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index b6ad306,21d2873..113a347
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@@ -30,33 -30,57 +30,66 @@@ import java.util.*
  
  /**
   * This class represents a block which is stored in local memory and not serialized.
+  * Concurrent read is supported, but concurrent write is not supported.
+  *
   * @param <K> the key type of its partitions.
   */
- @ThreadSafe
+ @NotThreadSafe
  public final class NonSerializedMemoryBlock<K extends Serializable> implements Block<K>
{
  
+   private final String id;
    private final List<NonSerializedPartition<K>> nonSerializedPartitions;
+   private final Map<K, NonSerializedPartition<K>> nonCommittedPartitionsMap;
    private final Serializer serializer;
 +  private final boolean readAsBytes;
 +  private final boolean writeAsBytes;
    private volatile boolean committed;
  
    /**
     * Constructor.
 +   * If write (or read) as bytes is enabled, data written to (read from) this block does
not (de)serialized.
     *
 -   * @param blockId    the ID of this block.
 -   * @param serializer the {@link Serializer}.
++   * @param blockId      the ID of this block.
 +   * @param serializer   the {@link Serializer}.
 +   * @param readAsBytes  whether read data from this block as arrays of bytes or not.
 +   * @param writeAsBytes whether write data to this block as arrays of bytes or not.
     */
-   public NonSerializedMemoryBlock(final Serializer serializer,
+   public NonSerializedMemoryBlock(final String blockId,
 -                                  final Serializer serializer) {
++                                  final Serializer serializer,
 +                                  final boolean readAsBytes,
 +                                  final boolean writeAsBytes) {
+     this.id = blockId;
      this.nonSerializedPartitions = new ArrayList<>();
+     this.nonCommittedPartitionsMap = new HashMap<>();
      this.serializer = serializer;
-     this.committed = false;
 +    this.readAsBytes = readAsBytes;
 +    this.writeAsBytes = writeAsBytes;
+     this.committed = false;
+   }
+ 
+   /**
+    * Writes an element to non-committed block.
+    * Invariant: This should not be invoked after this block is committed.
+    * Invariant: This method does not support concurrent write.
+    *
+    * @param key     the key.
+    * @param element the element to write.
+    * @throws BlockWriteException for any error occurred while trying to write a block.
+    */
+   @Override
+   public void write(final K key,
+                     final Object element) throws BlockWriteException {
+     if (committed) {
+       throw new BlockWriteException(new Throwable("The partition is already committed!"));
+     } else {
+       try {
+         final NonSerializedPartition<K> partition =
+             nonCommittedPartitionsMap.computeIfAbsent(key, absentKey -> new NonSerializedPartition<>(key));
+         partition.write(element);
+       } catch (final IOException e) {
+         throw new BlockWriteException(e);
+       }
+     }
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 1687f8e,228157a..2630b26
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@@ -30,33 -30,60 +30,69 @@@ import java.util.*
  
  /**
   * This class represents a block which is serialized and stored in local memory.
+  * Concurrent read is supported, but concurrent write is not supported.
+  *
   * @param <K> the key type of its partitions.
   */
- @ThreadSafe
+ @NotThreadSafe
  public final class SerializedMemoryBlock<K extends Serializable> implements Block<K>
{
  
+   private final String id;
    private final List<SerializedPartition<K>> serializedPartitions;
+   private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
    private final Serializer serializer;
 +  private final boolean readAsBytes;
 +  private final boolean writeAsBytes;
    private volatile boolean committed;
  
    /**
     * Constructor.
 +   * If write (or read) as bytes is enabled, data written to (read from) this block does
not (de)serialized.
     *
 -   * @param blockId    the ID of this block.
 -   * @param serializer the {@link Serializer}.
++   * @param blockId      the ID of this block.
 +   * @param serializer   the {@link Serializer}.
 +   * @param readAsBytes  whether read data from this block as arrays of bytes or not.
 +   * @param writeAsBytes whether write data to this block as arrays of bytes or not.
     */
-   public SerializedMemoryBlock(final Serializer serializer,
+   public SerializedMemoryBlock(final String blockId,
 -                               final Serializer serializer) {
++                               final Serializer serializer,
 +                               final boolean readAsBytes,
 +                               final boolean writeAsBytes) {
+     this.id = blockId;
      this.serializedPartitions = new ArrayList<>();
+     this.nonCommittedPartitionsMap = new HashMap<>();
      this.serializer = serializer;
-     this.committed = false;
 +    this.readAsBytes = readAsBytes;
 +    this.writeAsBytes = writeAsBytes;
+     this.committed = false;
+   }
+ 
+   /**
+    * Writes an element to non-committed block.
+    * Invariant: This should not be invoked after this block is committed.
+    * Invariant: This method does not support concurrent write.
+    *
+    * @param key     the key.
+    * @param element the element to write.
+    * @throws BlockWriteException for any error occurred while trying to write a block.
+    */
+   @Override
+   public void write(final K key,
+                     final Object element) throws BlockWriteException {
+     if (committed) {
+       throw new BlockWriteException(new Throwable("The partition is already committed!"));
+     } else {
+       try {
+         SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+         if (partition == null) {
+           partition = new SerializedPartition<>(key, serializer);
+           nonCommittedPartitionsMap.put(key, partition);
+         }
+         partition.write(element);
+       } catch (final IOException e) {
+         throw new BlockWriteException(e);
+       }
+     }
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index fdfa075,df33b51..cae38b2
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@@ -125,16 -107,9 +125,22 @@@ public abstract class FileMetadata<K ex
    }
  
    /**
 +   * @return whether read data from this file as arrays of bytes or not.
 +   */
 +  public boolean isReadAsBytes() {
 +    return readAsBytes;
 +  }
 +
 +  /**
 +   * @return whether write to this file as arrays of bytes or not.
 +   */
 +  public boolean isWriteAsBytes() {
 +    return writeAsBytes;
 +  }
++  /**
+    * @return whether this file is committed or not.
+    */
+   public final boolean isCommitted() {
+     return committed.get();
+   }
  }
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index ad77f03,880ab1a..ec71b25
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@@ -17,6 -17,6 +17,7 @@@ package edu.snu.nemo.runtime.executor.d
  
  import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
  import edu.snu.nemo.runtime.executor.data.SerializerManager;
++import edu.snu.nemo.runtime.executor.data.block.Block;
  import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
  
  /**
@@@ -35,17 -35,6 +36,18 @@@ public abstract class AbstractBlockStor
    }
  
    /**
 +   * Creates a new block.
 +   *
 +   * @param blockId the ID of the block to create.
-    * @see BlockStore#createBlock(String)
++   * @return the created block.
++   * @see BlockStore#createBlock(String).
 +   */
 +  @Override
-   public void createBlock(final String blockId) {
-     createBlock(blockId, false, false);
++  public Block createBlock(final String blockId) {
++    return createBlock(blockId, false, false);
 +  }
 +
 +  /**
     * Gets data coder for a block from the {@link SerializerManager}.
     *
     * @param blockId the ID of the block to get the coder.
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
index 4473048,75b2d26..1648778
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
@@@ -34,110 -30,39 +30,57 @@@ public interface BlockStore 
     * A stale data created by previous failed task should be handled during the creation
of new block.
     *
     * @param blockId the ID of the block to create.
+    * @return the created block.
     * @throws BlockWriteException for any error occurred while trying to create a block.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
+    *                             (This exception will be thrown to the scheduler
+    *                             through {@link edu.snu.nemo.runtime.executor.Executor}
and
+    *                             have to be handled by the scheduler with fault tolerance
mechanism.)
     */
-   void createBlock(String blockId) throws BlockWriteException;
+   Block createBlock(String blockId) throws BlockWriteException;
  
    /**
 +   * Creates a new block.
 +   * A stale data created by previous failed task should be handled during the creation
of new block.
 +   * If write (or read) as bytes is enabled, data written to (read from) the block does
not (de)serialized.
 +   *
 +   * @param blockId      the ID of the block to create.
 +   * @param readAsBytes  whether read data as arrays of bytes or not.
 +   * @param writeAsBytes whether write data as arrays of bytes or not.
++   * @return the created block.
 +   * @throws BlockWriteException for any error occurred while trying to create a block.
 +   *         (This exception will be thrown to the scheduler
 +   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
 +   *          have to be handled by the scheduler with fault tolerance mechanism.)
 +   */
-   void createBlock(String blockId,
-                    boolean readAsBytes,
-                    boolean writeAsBytes) throws BlockWriteException;
++  Block createBlock(String blockId,
++                    boolean readAsBytes,
++                    boolean writeAsBytes) throws BlockWriteException;
 +
 +  /**
-    * Saves an iterable of {@link NonSerializedPartition}s to a block.
-    * If the block exists already, appends the data to it.
-    * Invariant: This method may not support concurrent write for a single block.
-    *            Only one thread have to write at once.
+    * Writes a committed block to this store.
     *
-    * @param blockId    of the block.
-    * @param partitions to save to a block.
-    * @param <K>        the key type of the partitions.
-    * @return the size of the data per partition (only when the data is serialized).
-    * @throws BlockWriteException for any error occurred while trying to write a block.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
+    * @param block the block to write.
+    * @throws BlockWriteException if fail to write.
+    *                             (This exception will be thrown to the scheduler
+    *                             through {@link edu.snu.nemo.runtime.executor.Executor}
and
+    *                             have to be handled by the scheduler with fault tolerance
mechanism.)
     */
-   <K extends Serializable> Optional<List<Long>> putPartitions(String blockId,
-                                      Iterable<NonSerializedPartition<K>> partitions)
throws BlockWriteException;
+   void writeBlock(Block block) throws BlockWriteException;
  
    /**
-    * Saves an iterable of {@link SerializedPartition}s to a block.
-    * If the block exists already, appends the data to it.
-    * Invariant: This method may not support concurrent write for a single block.
-    *            Only one thread have to write at once.
+    * Reads a committed block from this store.
     *
-    * @param blockId    of the block.
-    * @param partitions to save to a block.
-    * @param <K>        the key type of the partitions.
-    * @return the size of the data per partition (only when the data is serialized).
-    * @throws BlockWriteException for any error occurred while trying to write a block.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
-    */
-   <K extends Serializable> List<Long> putSerializedPartitions(String blockId,
-                                      Iterable<SerializedPartition<K>> partitions)
throws BlockWriteException;
- 
-   /**
-    * Retrieves {@link NonSerializedPartition}s.
-    * They belong to a specific {@link edu.snu.nemo.runtime.common.data.KeyRange} from a
block.
-    *
-    * @param blockId  of the target partition.
-    * @param keyRange the key range.
-    * @param <K>      the key type of the partitions.
-    * @return the result elements from the target block (if the target block exists).
+    * @param blockId of the target partition.
+    * @return the target block (if it exists).
     * @throws BlockFetchException for any error occurred while trying to fetch a block.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
-    */
-   <K extends Serializable> Optional<Iterable<NonSerializedPartition<K>>>
getPartitions(String blockId,
-                                                            KeyRange<K> keyRange) throws
BlockFetchException;
- 
-   /**
-    * Retrieves {@link SerializedPartition}s in a specific {@link KeyRange} from a block.
-    *
-    * @param blockId   of the target block.
-    * @param keyRange the key range.
-    * @param <K> the key type of the partitions.
-    * @return the result elements from the target block (if the target block exists).
-    * @throws BlockFetchException for any error occurred while trying to fetch a partition.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
+    *                             (This exception will be thrown to the scheduler
+    *                             through {@link edu.snu.nemo.runtime.executor.Executor}
and
+    *                             have to be handled by the scheduler with fault tolerance
mechanism.)
     */
-   <K extends Serializable> Optional<Iterable<SerializedPartition<K>>>
getSerializedPartitions(String blockId,
-                                                                      KeyRange<K> keyRange)
throws BlockFetchException;
+   Optional<Block> readBlock(String blockId) throws BlockFetchException;
  
    /**
-    * Notifies that all writes for a block is end.
-    *
-    * @param blockId of the block.
-    * @throws BlockWriteException if fail to commit.
-    *         (This exception will be thrown to the scheduler
-    *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-    *          have to be handled by the scheduler with fault tolerance mechanism.)
-    */
-   void commitBlock(String blockId) throws BlockWriteException;
- 
-   /**
-    * Removes a block of data.
+    * Deletes a block from this store.
     *
     * @param blockId of the block.
     * @return whether the partition exists or not.
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
index d44dbd4,9af5524..fcf83cf
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
@@@ -66,62 -60,16 +60,18 @@@ public final class GlusterFileStore ext
    }
  
    /**
-    * Creates a new block.
-    * If write (or read) as bytes is enabled, data written to (read from) the block does
not (de)serialized.
-    *
-    * @param blockId      the ID of the block to create.
-    * @param readAsBytes  whether read data as arrays of bytes or not.
-    * @param writeAsBytes whether write data as arrays of bytes or not.
-    * @see BlockStore#createBlock(String)
 -   * @see BlockStore#createBlock(String).
++   * @see BlockStore#createBlock(String, boolean, boolean).
     */
    @Override
-   public void createBlock(final String blockId,
-                           final boolean readAsBytes,
-                           final boolean writeAsBytes) {
-     removeBlock(blockId);
 -  public Block createBlock(final String blockId) {
++  public Block createBlock(final String blockId,
++                           final boolean readAsBytes,
++                           final boolean writeAsBytes) {
+     deleteBlock(blockId);
      final Serializer serializer = getSerializerFromWorker(blockId);
      final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
      final RemoteFileMetadata metadata =
 -        RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
 +        RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory),
readAsBytes, writeAsBytes);
-     final FileBlock block = new FileBlock<>(serializer, filePath, metadata);
-     blockMap.put(blockId, block);
-   }
- 
-   /**
-    * @see BlockStore#putPartitions(String, Iterable)
-    */
-   @Override
-   public <K extends Serializable>
-   Optional<List<Long>> putPartitions(final String blockId,
-                                      final Iterable<NonSerializedPartition<K>>
partitions)
-       throws BlockWriteException {
-     try {
-       final Block<K> block = blockMap.get(blockId);
-       if (block == null) {
-         throw new BlockWriteException(new Throwable("The block " + blockId + "is not created
yet."));
-       }
-       return block.putPartitions(partitions);
-     } catch (final IOException e) {
-       throw new BlockWriteException(new Throwable("Failed to store partitions to this block."));
-     }
-   }
- 
-   /**
-    * @see BlockStore#putSerializedPartitions(String, Iterable)
-    */
-   @Override
-   public <K extends Serializable>
-   List<Long> putSerializedPartitions(final String blockId,
-                                      final Iterable<SerializedPartition<K>>
partitions) {
-     try {
-       final Block<K> block = blockMap.get(blockId);
-       if (block == null) {
-         throw new BlockWriteException(new Throwable("The block " + blockId + "is not created
yet."));
-       }
-       return block.putSerializedPartitions(partitions);
-     } catch (final IOException e) {
-       throw new BlockWriteException(new Throwable("Failed to store partitions to this block."));
-     }
+     return new FileBlock<>(blockId, serializer, filePath, metadata);
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
index 9ec0a22,689e3fe..4a0cfae
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
@@@ -51,26 -51,34 +51,35 @@@ public final class LocalFileStore exten
    }
  
    /**
-    * Creates a new block.
-    * If write (or read) as bytes is enabled, data written to (read from) the block does
not (de)serialized.
-    *
-    * @param blockId      the ID of the block to create.
-    * @param readAsBytes  whether read data as arrays of bytes or not.
-    * @param writeAsBytes whether write data as arrays of bytes or not.
-    * @see BlockStore#createBlock(String)
 -   * @see BlockStore#createBlock(String).
++   * @see BlockStore#createBlock(String, boolean, boolean).
     */
    @Override
-   public void createBlock(final String blockId,
-                           final boolean readAsBytes,
-                           final boolean writeAsBytes) {
-     removeBlock(blockId);
- 
 -  public Block createBlock(final String blockId) {
++  public Block createBlock(final String blockId,
++                           final boolean readAsBytes,
++                           final boolean writeAsBytes) {
+     deleteBlock(blockId);
 -
      final Serializer serializer = getSerializerFromWorker(blockId);
 -    final LocalFileMetadata metadata = new LocalFileMetadata();
 +    final LocalFileMetadata metadata = new LocalFileMetadata(readAsBytes, writeAsBytes);
  
-     final FileBlock block =
-         new FileBlock(serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory), metadata);
-     getBlockMap().put(blockId, block);
+     return new FileBlock(blockId, serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory),
metadata);
+   }
+ 
+   /**
+    * Writes a committed block to this store.
+    *
+    * @param block the block to write.
+    * @throws BlockWriteException if fail to write.
+    */
+   @Override
+   public void writeBlock(final Block block) throws BlockWriteException {
+     if (!(block instanceof FileBlock)) {
+       throw new BlockWriteException(new Throwable(
+           this.toString() + "only accept " + FileBlock.class.getName()));
+     } else if (!block.isCommitted()) {
+       throw new BlockWriteException(new Throwable("The block " + block.getId() + "is not
committed yet."));
+     } else {
+       getBlockMap().put(block.getId(), block);
+     }
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
index 4791b1d,d2ffa28..d60e79e
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
@@@ -39,14 -42,12 +42,14 @@@ public final class MemoryStore extends 
    }
  
    /**
 -   * @see BlockStore#createBlock(String)
 +   * @see BlockStore#createBlock(String, boolean, boolean).
     */
    @Override
-   public void createBlock(final String blockId,
-                           final boolean readAsBytes,
-                           final boolean writeAsBytes) {
 -  public NonSerializedMemoryBlock createBlock(final String blockId) {
++  public Block createBlock(final String blockId,
++                           final boolean readAsBytes,
++                           final boolean writeAsBytes) {
      final Serializer serializer = getSerializerFromWorker(blockId);
-     getBlockMap().put(blockId, new NonSerializedMemoryBlock(serializer, readAsBytes, writeAsBytes));
 -    return new NonSerializedMemoryBlock(blockId, serializer);
++    return new NonSerializedMemoryBlock(serializer, readAsBytes, writeAsBytes);
    }
  
    /**
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index 7ee1d81,dce9bed..b125096
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@@ -39,14 -40,12 +41,14 @@@ public final class SerializedMemoryStor
    }
  
    /**
 -   * @see BlockStore#createBlock(String)
 +   * @see BlockStore#createBlock(String, boolean, boolean).
     */
    @Override
-   public void createBlock(final String blockId,
-                           final boolean readAsBytes,
-                           final boolean writeAsBytes) {
 -  public Block createBlock(final String blockId) {
++  public Block createBlock(final String blockId,
++                           final boolean readAsBytes,
++                           final boolean writeAsBytes) {
      final Serializer serializer = getSerializerFromWorker(blockId);
-     getBlockMap().put(blockId, new SerializedMemoryBlock(serializer, readAsBytes, writeAsBytes));
 -    return new SerializedMemoryBlock(blockId, serializer);
++    return new SerializedMemoryBlock(serializer, readAsBytes, writeAsBytes);
    }
  
    /**
diff --cc tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 70379ee,70379ee..0d1b611
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@@ -99,7 -99,7 +99,6 @@@ import static org.mockito.Mockito.mock
  public final class DataTransferTest {
    private static final String EXECUTOR_ID_PREFIX = "Executor";
    private static final int EXECUTOR_CAPACITY = 1;
--  private static final int MAX_SCHEDULE_ATTEMPT = 2;
    private static final int SCHEDULE_TIMEOUT = 1000;
    private static final DataStoreProperty.Value MEMORY_STORE = DataStoreProperty.Value.MemoryStore;
    private static final DataStoreProperty.Value SER_MEMORY_STORE = DataStoreProperty.Value.SerializedMemoryStore;

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message