nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 15/40: ckpt
Date Fri, 06 Apr 2018 02:35:55 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 26ad2a37ffa85807f3c9651914fa0908ff6c7e39
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Sat Mar 10 02:33:59 2018 +0900

    ckpt
---
 .../ir/edge/executionproperty/AsBytesProperty.java |  4 +--
 .../runtime/executor/data/SerializerManager.java   | 10 ++++++
 .../runtime/executor/data/block/FileBlock.java     | 16 +++++----
 .../data/block/NonSerializedMemoryBlock.java       | 13 ++++++--
 .../executor/data/block/SerializedMemoryBlock.java | 17 +++++++---
 .../executor/data/metadata/FileMetadata.java       | 38 ++++++++++++++++++++--
 .../executor/data/metadata/LocalFileMetadata.java  |  9 +++--
 .../executor/data/metadata/RemoteFileMetadata.java | 35 ++++++++++++++++----
 .../executor/data/stores/AbstractBlockStore.java   | 13 +++++++-
 .../runtime/executor/data/stores/BlockStore.java   | 17 ++++++++++
 .../executor/data/stores/GlusterFileStore.java     | 11 +++++--
 .../executor/data/stores/LocalFileStore.java       | 11 +++++--
 .../runtime/executor/data/stores/MemoryStore.java  |  8 +++--
 .../data/stores/SerializedMemoryStore.java         |  9 +++--
 14 files changed, 172 insertions(+), 39 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AsBytesProperty.java
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AsBytesProperty.java
index a0e2e7d..13ab03c 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AsBytesProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AsBytesProperty.java
@@ -42,8 +42,8 @@ public final class AsBytesProperty extends ExecutionProperty<AsBytesProperty.Val
 
   /**
    * Possible values of AsBytes ExecutionProperty.
-   * When an edge is annotated as Write(or Reda)AsBytes,
-   * the writing (or reading) Task writes (or reads) data as arrays of bytes, instead of
(de)serialized form.
+   * When an edge is annotated as Write(or Read)AsBytes,
+   * the writing (reading) Task writes (reads) data as arrays of bytes, instead of (de)serialized
form.
    */
   public enum Value {
     ReadAsBytes,
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
index da63f92..0cf2ceb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.data;
 
+import edu.snu.nemo.common.coder.BytesCoder;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
 import edu.snu.nemo.common.coder.Coder;
@@ -37,6 +38,8 @@ import java.util.concurrent.ConcurrentMap;
  */
 public final class SerializerManager {
   private static final Logger LOG = LoggerFactory.getLogger(SerializerManager.class.getName());
+  private static final Serializer<byte[]> AS_BYTES_SERIALIZER =
+      new Serializer<>(new BytesCoder(), Collections.emptyList());
   private final ConcurrentMap<String, Serializer> runtimeEdgeIdToSerializer = new ConcurrentHashMap<>();
 
   /**
@@ -86,4 +89,11 @@ public final class SerializerManager {
     }
     return serializer;
   }
+
+  /**
+   * @return a serializer which just converts between ByteStream and array of bytes.
+   */
+  public static Serializer<byte[]> getAsBytesSerializer() {
+    return AS_BYTES_SERIALIZER;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index dbb5600..174eb4e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -38,10 +38,11 @@ public final class FileBlock<K extends Serializable> implements
Block<K> {
 
   /**
    * Constructor.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
-   * @param serializer    the {@link Serializer}.
-   * @param filePath the path of the file that this block will be stored.
-   * @param metadata the metadata for this block.
+   * @param serializer the {@link Serializer}.
+   * @param filePath   the path of the file that this block will be stored.
+   * @param metadata   the metadata for this block.\
    */
   public FileBlock(final Serializer serializer,
                    final String filePath,
@@ -55,7 +56,7 @@ public final class FileBlock<K extends Serializable> implements Block<K>
{
    * Writes the serialized data of this block having a specific key value as a partition
to the file
    * where this block resides.
    * Invariant: This method does not support concurrent write for a single block.
-   *            Only one thread have to write at once.
+   * Only one thread have to write at once.
    *
    * @param serializedPartitions the iterable of the serialized partitions to write.
    * @throws IOException if fail to write.
@@ -80,7 +81,8 @@ public final class FileBlock<K extends Serializable> implements Block<K>
{
    */
   @Override
   public Optional<List<Long>> putPartitions(final Iterable<NonSerializedPartition<K>>
partitions) throws IOException {
-    final Iterable<SerializedPartition<K>> convertedPartitions =
+    final Iterable<SerializedPartition<K>> convertedPartitions = metadata.isWriteAsBytes()
?
+        DataUtil.convertToSerPartitions(SerializerManager.getAsBytesSerializer(), partitions)
:
         DataUtil.convertToSerPartitions(serializer, partitions);
 
     return Optional.of(putSerializedPartitions(convertedPartitions));
@@ -114,6 +116,8 @@ public final class FileBlock<K extends Serializable> implements
Block<K> {
   @Override
   public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange keyRange)
throws IOException {
     // Deserialize the data
+    final Serializer serializerToUse = metadata.isReadAsBytes() ?
+        SerializerManager.getAsBytesSerializer() : serializer;
     final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
     try (final FileInputStream fileStream = new FileInputStream(filePath)) {
       for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataIterable())
{
@@ -129,7 +133,7 @@ public final class FileBlock<K extends Serializable> implements
Block<K> {
               new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
           final NonSerializedPartition<K> deserializePartition =
               DataUtil.deserializePartition(
-                  partitionMetadata.getElementsTotal(), serializer, key, limitedInputStream);
+                  partitionMetadata.getElementsTotal(), serializerToUse, key, limitedInputStream);
           deserializedPartitions.add(deserializePartition);
           // rearrange file pointer
           final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 31508e1..b6ad306 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -37,17 +37,26 @@ public final class NonSerializedMemoryBlock<K extends Serializable>
implements B
 
   private final List<NonSerializedPartition<K>> nonSerializedPartitions;
   private final Serializer serializer;
+  private final boolean readAsBytes;
+  private final boolean writeAsBytes;
   private volatile boolean committed;
 
   /**
    * Constructor.
+   * If write (or read) as bytes is enabled, data written to (read from) this block does
not (de)serialized.
    *
-   * @param serializer the {@link Serializer}.
+   * @param serializer   the {@link Serializer}.
+   * @param readAsBytes  whether read data from this block as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this block as arrays of bytes or not.
    */
-  public NonSerializedMemoryBlock(final Serializer serializer) {
+  public NonSerializedMemoryBlock(final Serializer serializer,
+                                  final boolean readAsBytes,
+                                  final boolean writeAsBytes) {
     this.nonSerializedPartitions = new ArrayList<>();
     this.serializer = serializer;
     this.committed = false;
+    this.readAsBytes = readAsBytes;
+    this.writeAsBytes = writeAsBytes;
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index ec6ee77..1687f8e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -37,17 +37,26 @@ public final class SerializedMemoryBlock<K extends Serializable>
implements Bloc
 
   private final List<SerializedPartition<K>> serializedPartitions;
   private final Serializer serializer;
+  private final boolean readAsBytes;
+  private final boolean writeAsBytes;
   private volatile boolean committed;
 
   /**
    * Constructor.
+   * If write (or read) as bytes is enabled, data written to (read from) this block does
not (de)serialized.
    *
-   * @param serializer the {@link Serializer}.
+   * @param serializer   the {@link Serializer}.
+   * @param readAsBytes  whether read data from this block as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this block as arrays of bytes or not.
    */
-  public SerializedMemoryBlock(final Serializer serializer) {
+  public SerializedMemoryBlock(final Serializer serializer,
+                               final boolean readAsBytes,
+                               final boolean writeAsBytes) {
+    this.serializedPartitions = new ArrayList<>();
     this.serializer = serializer;
-    serializedPartitions = new ArrayList<>();
-    committed = false;
+    this.committed = false;
+    this.readAsBytes = readAsBytes;
+    this.writeAsBytes = writeAsBytes;
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index a43d04e..fdfa075 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -32,31 +32,48 @@ public abstract class FileMetadata<K extends Serializable> {
 
   private final List<PartitionMetadata<K>> partitionMetadataList; // The list
of partition metadata.
   private final AtomicBoolean committed;
+  private final boolean readAsBytes;
+  private final boolean writeAsBytes;
   private volatile long writtenBytesCursor; // Indicates how many bytes are (at least, logically)
written in the file.
 
   /**
    * Construct a new file metadata.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
+   *
+   * @param readAsBytes  whether read data from this file as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this file as arrays of bytes or not.
    */
-  public FileMetadata() {
+  public FileMetadata(final boolean readAsBytes,
+                      final boolean writeAsBytes) {
     this.partitionMetadataList = new ArrayList<>();
     this.writtenBytesCursor = 0;
     this.committed = new AtomicBoolean(false);
+    this.readAsBytes = readAsBytes;
+    this.writeAsBytes = writeAsBytes;
   }
 
   /**
    * Construct a file metadata with existing partition metadata.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
+   *
    * @param partitionMetadataList the partition metadata list.
+   * @param readAsBytes           whether read data from this file as arrays of bytes or
not.
+   * @param writeAsBytes          whether write data to this file as arrays of bytes or not.
    */
-  public FileMetadata(final List<PartitionMetadata<K>> partitionMetadataList)
{
+  public FileMetadata(final List<PartitionMetadata<K>> partitionMetadataList,
+                      final boolean readAsBytes,
+                      final boolean writeAsBytes) {
     this.partitionMetadataList = partitionMetadataList;
     this.writtenBytesCursor = 0;
     this.committed = new AtomicBoolean(true);
+    this.readAsBytes = readAsBytes;
+    this.writeAsBytes = writeAsBytes;
   }
 
   /**
    * Writes the metadata for a partition.
    *
-   * @param key     the key of the partition.
+   * @param key           the key of the partition.
    * @param partitionSize the size of the partition.
    * @param elementsTotal the number of elements in the partition.
    * @throws IOException if fail to append the partition metadata.
@@ -100,9 +117,24 @@ public abstract class FileMetadata<K extends Serializable> {
 
   /**
    * Set the commit value.
+   *
    * @param committed whether this block is committed or not.
    */
   protected final void setCommitted(final boolean committed) {
     this.committed.set(committed);
   }
+
+  /**
+   * @return whether read data from this file as arrays of bytes or not.
+   */
+  public boolean isReadAsBytes() {
+    return readAsBytes;
+  }
+
+  /**
+   * @return whether write to this file as arrays of bytes or not.
+   */
+  public boolean isWriteAsBytes() {
+    return writeAsBytes;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/LocalFileMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/LocalFileMetadata.java
index 349992a..6d2de00 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/LocalFileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/LocalFileMetadata.java
@@ -28,9 +28,14 @@ public final class LocalFileMetadata<K extends Serializable> extends
FileMetadat
 
   /**
    * Constructor.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
+   *
+   * @param readAsBytes  whether read data from this file as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this file as arrays of bytes or not.
    */
-  public LocalFileMetadata() {
-    super();
+  public LocalFileMetadata(final boolean readAsBytes,
+                           final boolean writeAsBytes) {
+    super(readAsBytes, writeAsBytes);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 8f98aa8..be4e7ae 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -38,23 +38,33 @@ public final class RemoteFileMetadata<K extends Serializable> extends
FileMetada
 
   /**
    * Constructor for creating a non-committed new file metadata.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
    * @param metaFilePath the metadata file path.
+   * @param readAsBytes  whether read data from this file as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this file as arrays of bytes or not.
    */
-  private RemoteFileMetadata(final String metaFilePath) {
-    super();
+  private RemoteFileMetadata(final String metaFilePath,
+                             final boolean readAsBytes,
+                             final boolean writeAsBytes) {
+    super(readAsBytes, writeAsBytes);
     this.metaFilePath = metaFilePath;
   }
 
   /**
    * Constructor for opening a existing file metadata.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
    * @param metaFilePath          the metadata file path.
    * @param partitionMetadataList the partition metadata list.
+   * @param readAsBytes  whether read data from this file as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this file as arrays of bytes or not.
    */
   private RemoteFileMetadata(final String metaFilePath,
-                             final List<PartitionMetadata<K>> partitionMetadataList)
{
-    super(partitionMetadataList);
+                             final List<PartitionMetadata<K>> partitionMetadataList,
+                             final boolean readAsBytes,
+                             final boolean writeAsBytes) {
+    super(partitionMetadataList, readAsBytes, writeAsBytes);
     this.metaFilePath = metaFilePath;
   }
 
@@ -77,6 +87,8 @@ public final class RemoteFileMetadata<K extends Serializable> extends
FileMetada
         final FileOutputStream metafileOutputStream = new FileOutputStream(metaFilePath,
false);
         final DataOutputStream dataOutputStream = new DataOutputStream(metafileOutputStream)
     ) {
+      dataOutputStream.writeBoolean(isReadAsBytes());
+      dataOutputStream.writeBoolean(isWriteAsBytes());
       for (PartitionMetadata<K> partitionMetadata : partitionMetadataItr) {
         final byte[] key = SerializationUtils.serialize(partitionMetadata.getKey());
         dataOutputStream.writeInt(key.length);
@@ -91,13 +103,18 @@ public final class RemoteFileMetadata<K extends Serializable> extends
FileMetada
 
   /**
    * Creates a new block metadata.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
    * @param metaFilePath the path of the file to write metadata.
+   * @param readAsBytes  whether read data from this file as arrays of bytes or not.
+   * @param writeAsBytes whether write data to this file as arrays of bytes or not.
    * @param <T>          the key type of the block's partitions.
    * @return the created block metadata.
    */
-  public static <T extends Serializable> RemoteFileMetadata<T> create(final String
metaFilePath) {
-    return new RemoteFileMetadata<>(metaFilePath);
+  public static <T extends Serializable> RemoteFileMetadata<T> create(final String
metaFilePath,
+                                                                      final boolean readAsBytes,
+                                                                      final boolean writeAsBytes)
{
+    return new RemoteFileMetadata<>(metaFilePath, readAsBytes, writeAsBytes);
   }
 
   /**
@@ -113,10 +130,14 @@ public final class RemoteFileMetadata<K extends Serializable>
extends FileMetada
       throw new IOException("File " + metaFilePath + " does not exist!");
     }
     final List<PartitionMetadata<T>> partitionMetadataList = new ArrayList<>();
+    final boolean readAsBytes;
+    final boolean writeAsBytes;
     try (
         final FileInputStream metafileInputStream = new FileInputStream(metaFilePath);
         final DataInputStream dataInputStream = new DataInputStream(metafileInputStream)
     ) {
+      readAsBytes = dataInputStream.readBoolean();
+      writeAsBytes = dataInputStream.readBoolean();
       while (dataInputStream.available() > 0) {
         final int keyLength = dataInputStream.readInt();
         final byte[] desKey = new byte[keyLength];
@@ -133,6 +154,6 @@ public final class RemoteFileMetadata<K extends Serializable> extends
FileMetada
         partitionMetadataList.add(partitionMetadata);
       }
     }
-    return new RemoteFileMetadata<>(metaFilePath, partitionMetadataList);
+    return new RemoteFileMetadata<>(metaFilePath, partitionMetadataList, readAsBytes,
writeAsBytes);
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index b1431fa..ad77f03 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@ -35,12 +35,23 @@ public abstract class AbstractBlockStore implements BlockStore {
   }
 
   /**
+   * Creates a new block.
+   *
+   * @param blockId the ID of the block to create.
+   * @see BlockStore#createBlock(String)
+   */
+  @Override
+  public void createBlock(final String blockId) {
+    createBlock(blockId, false, false);
+  }
+
+  /**
    * Gets data coder for a block from the {@link SerializerManager}.
    *
    * @param blockId the ID of the block to get the coder.
    * @return the coder.
    */
-  public final Serializer getSerializerFromWorker(final String blockId) {
+  protected final Serializer getSerializerFromWorker(final String blockId) {
     final String runtimeEdgeId = RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId);
     return serializerManager.getSerializer(runtimeEdgeId);
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
index 8be11a6..4473048 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
@@ -42,6 +42,23 @@ public interface BlockStore {
   void createBlock(String blockId) throws BlockWriteException;
 
   /**
+   * Creates a new block.
+   * A stale data created by previous failed task should be handled during the creation of
new block.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
+   *
+   * @param blockId      the ID of the block to create.
+   * @param readAsBytes  whether read data as arrays of bytes or not.
+   * @param writeAsBytes whether write data as arrays of bytes or not.
+   * @throws BlockWriteException for any error occurred while trying to create a block.
+   *         (This exception will be thrown to the scheduler
+   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
+   *          have to be handled by the scheduler with fault tolerance mechanism.)
+   */
+  void createBlock(String blockId,
+                   boolean readAsBytes,
+                   boolean writeAsBytes) throws BlockWriteException;
+
+  /**
    * Saves an iterable of {@link NonSerializedPartition}s to a block.
    * If the block exists already, appends the data to it.
    * Invariant: This method may not support concurrent write for a single block.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
index ef48b54..d44dbd4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
@@ -67,17 +67,22 @@ public final class GlusterFileStore extends AbstractBlockStore implements
Remote
 
   /**
    * Creates a new block.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
-   * @param blockId the ID of the block to create.
+   * @param blockId      the ID of the block to create.
+   * @param readAsBytes  whether read data as arrays of bytes or not.
+   * @param writeAsBytes whether write data as arrays of bytes or not.
    * @see BlockStore#createBlock(String)
    */
   @Override
-  public void createBlock(final String blockId) {
+  public void createBlock(final String blockId,
+                          final boolean readAsBytes,
+                          final boolean writeAsBytes) {
     removeBlock(blockId);
     final Serializer serializer = getSerializerFromWorker(blockId);
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
     final RemoteFileMetadata metadata =
-        RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
+        RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory),
readAsBytes, writeAsBytes);
     final FileBlock block = new FileBlock<>(serializer, filePath, metadata);
     blockMap.put(blockId, block);
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
index fd54512..9ec0a22 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
@@ -52,16 +52,21 @@ public final class LocalFileStore extends LocalBlockStore implements FileStore
{
 
   /**
    * Creates a new block.
+   * If write (or read) as bytes is enabled, data written to (read from) the block does not
(de)serialized.
    *
-   * @param blockId the ID of the block to create.
+   * @param blockId      the ID of the block to create.
+   * @param readAsBytes  whether read data as arrays of bytes or not.
+   * @param writeAsBytes whether write data as arrays of bytes or not.
    * @see BlockStore#createBlock(String)
    */
   @Override
-  public void createBlock(final String blockId) {
+  public void createBlock(final String blockId,
+                          final boolean readAsBytes,
+                          final boolean writeAsBytes) {
     removeBlock(blockId);
 
     final Serializer serializer = getSerializerFromWorker(blockId);
-    final LocalFileMetadata metadata = new LocalFileMetadata();
+    final LocalFileMetadata metadata = new LocalFileMetadata(readAsBytes, writeAsBytes);
 
     final FileBlock block =
         new FileBlock(serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory), metadata);
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
index d0b593e..4791b1d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -39,12 +39,14 @@ public final class MemoryStore extends LocalBlockStore {
   }
 
   /**
-   * @see BlockStore#createBlock(String)
+   * @see BlockStore#createBlock(String, boolean, boolean).
    */
   @Override
-  public void createBlock(final String blockId) {
+  public void createBlock(final String blockId,
+                          final boolean readAsBytes,
+                          final boolean writeAsBytes) {
     final Serializer serializer = getSerializerFromWorker(blockId);
-    getBlockMap().put(blockId, new NonSerializedMemoryBlock(serializer));
+    getBlockMap().put(blockId, new NonSerializedMemoryBlock(serializer, readAsBytes, writeAsBytes));
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index 718a221..7ee1d81 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -30,6 +30,7 @@ public final class SerializedMemoryStore extends LocalBlockStore {
 
   /**
    * Constructor.
+   *
    * @param serializerManager the serializer manager.
    */
   @Inject
@@ -38,12 +39,14 @@ public final class SerializedMemoryStore extends LocalBlockStore {
   }
 
   /**
-   * @see BlockStore#createBlock(String)
+   * @see BlockStore#createBlock(String, boolean, boolean).
    */
   @Override
-  public void createBlock(final String blockId) {
+  public void createBlock(final String blockId,
+                          final boolean readAsBytes,
+                          final boolean writeAsBytes) {
     final Serializer serializer = getSerializerFromWorker(blockId);
-    getBlockMap().put(blockId, new SerializedMemoryBlock(serializer));
+    getBlockMap().put(blockId, new SerializedMemoryBlock(serializer, readAsBytes, writeAsBytes));
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message