nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-72] Instance-based Encoder/Decoder interface (#48)
Date Tue, 19 Jun 2018 05:04:06 GMT
This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dd93ac  [NEMO-72] Instance-based Encoder/Decoder interface (#48)
5dd93ac is described below

commit 5dd93acb696496380cbb25a84121c4d1cab0b71a
Author: Sanha Lee <sanhaleehana@gmail.com>
AuthorDate: Tue Jun 19 14:04:04 2018 +0900

    [NEMO-72] Instance-based Encoder/Decoder interface (#48)
    
    JIRA: [NEMO-72: Instance-based Encoder/Decoder interface](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-72)
    
    **Major changes:**
    - Modify the interface of `Coder` from static fashion to instance-based fashion
      (to avoid to generate instance-based coder such as Spark serializer for every encoding).
    - Split `Coder` into `Encoder` and `Decoder`
      (to support some use cases to separate the encoding and decoding like in [#32](https://issues.apache.org/jira/browse/NEMO-32)).
    - Split `StreamChainer` into `EncodeStreamChainer` and `DecodeStreamChainer`.
    
    **Minor changes to note:**
    - Split `CoderProperty` into `EncoderProperty` and `DecoderProperty`
    - Split `CompressionProperty` into `CompressionProperty` and `DecompressionProperty`
    
    **Tests for the changes:**
    - Existing unit tests for `Coder`, `CoderProperty`, `Compression`, `CompressionProperty`, and other data-plain side tests cover this change.
    - Existing integration tests also cover this change.
    
    **Other comments:**
    - N/A.
    
    resolves [NEMO-72](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-72)
---
 bin/json2dot.py                                    |  15 ++-
 .../java/edu/snu/nemo/common/coder/BytesCoder.java |  53 --------
 .../main/java/edu/snu/nemo/common/coder/Coder.java |  77 -----------
 .../edu/snu/nemo/common/coder/DecoderFactory.java  |  91 +++++++++++++
 .../edu/snu/nemo/common/coder/EncoderFactory.java  |  92 +++++++++++++
 .../java/edu/snu/nemo/common/coder/IntCoder.java   |  52 --------
 .../snu/nemo/common/coder/IntDecoderFactory.java   |  70 ++++++++++
 .../snu/nemo/common/coder/IntEncoderFactory.java   |  70 ++++++++++
 .../java/edu/snu/nemo/common/coder/PairCoder.java  |  83 ------------
 .../snu/nemo/common/coder/PairDecoderFactory.java  |  95 +++++++++++++
 .../snu/nemo/common/coder/PairEncoderFactory.java  |  96 ++++++++++++++
 .../executionproperty/CompressionProperty.java     |   2 +
 .../{CoderProperty.java => DecoderProperty.java}   |  12 +-
 ...derProperty.java => DecompressionProperty.java} |  12 +-
 .../{CoderProperty.java => EncoderProperty.java}   |  12 +-
 .../edu/snu/nemo/common/ir/LoopVertexTest.java     |   1 -
 .../ExecutionPropertyMapTest.java                  |  17 +--
 .../frontend/beam/NemoPipelineVisitor.java         |  84 +++++++-----
 .../compiler/frontend/beam/coder/BeamCoder.java    |  70 ----------
 .../frontend/beam/coder/BeamDecoderFactory.java    | 147 +++++++++++++++++++++
 .../frontend/beam/coder/BeamEncoderFactory.java    | 112 ++++++++++++++++
 .../frontend/beam/coder/FloatArrayCoder.java       |   2 +-
 .../frontend/beam/coder/IntArrayCoder.java         |   2 +-
 .../compiler/frontend/spark/coder/SparkCoder.java  |  51 -------
 .../frontend/spark/coder/SparkDecoderFactory.java  |  71 ++++++++++
 .../frontend/spark/coder/SparkEncoderFactory.java  |  71 ++++++++++
 .../frontend/spark/core/SparkFrontendUtils.java    |  15 ++-
 .../frontend/spark/core/rdd/PairRDDFunctions.scala |   9 +-
 .../compiler/frontend/spark/core/rdd/RDD.scala     |  22 +--
 .../compiletime/annotating/CompressionPass.java    |   1 +
 ...CompressionPass.java => DecompressionPass.java} |  25 ++--
 ...eCoderPass.java => DefaultEdgeDecoderPass.java} |  19 +--
 ...eCoderPass.java => DefaultEdgeEncoderPass.java} |  19 +--
 .../composite/PrimitiveCompositePass.java          |   6 +-
 .../CommonSubexpressionEliminationPass.java        |  16 ++-
 .../reshaping/DataSkewReshapingPass.java           |   6 +-
 .../compiletime/reshaping/LoopOptimizations.java   |   6 +-
 .../reshaping/SailfishRelayReshapingPass.java      |   6 +-
 .../spark/sql/JavaUserDefinedTypedAggregation.java |   4 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |  22 ++-
 .../snu/nemo/runtime/executor/data/DataUtil.java   | 103 +++++++++------
 .../runtime/executor/data/SerializerManager.java   |  54 +++++---
 .../data/partition/SerializedPartition.java        |  12 +-
 .../streamchainer/CompressionStreamChainer.java    |  19 +--
 ...StreamChainer.java => DecodeStreamChainer.java} |  17 +--
 ...hainer.java => DecompressionStreamChainer.java} |  21 +--
 ...StreamChainer.java => EncodeStreamChainer.java} |  15 +--
 .../executor/data/streamchainer/Serializer.java    |  64 +++++----
 .../nemo/runtime/executor/data/BlockStoreTest.java |  13 +-
 .../executor/datatransfer/DataTransferTest.java    |  53 ++++----
 .../runtime/executor/task/TaskExecutorTest.java    |   1 -
 .../annotating/DefaultEdgeCoderPassTest.java       |  51 ++++---
 .../compiletime/reshaping/LoopFusionPassTest.java  |   9 +-
 .../reshaping/LoopInvariantCodeMotionPassTest.java |   6 +-
 .../optimizer/policy/PolicyBuilderTest.java        |   6 +-
 55 files changed, 1334 insertions(+), 746 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 20fa177..6f2d339 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -305,7 +305,8 @@ class IREdge:
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
         src = self.src
@@ -318,7 +319,7 @@ class IREdge:
             dst = dst.internalDstFor(self.id)
         except:
             pass
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
@@ -328,10 +329,11 @@ class StageEdge:
         self.dst = dst.internalDAG.vertices[properties['dstVertex']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -341,10 +343,11 @@ class RuntimeEdge:
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java
deleted file mode 100644
index 3e467f5..0000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.*;
-
-/**
- * A {@link Coder} which is used for an array of bytes.
- */
-public final class BytesCoder implements Coder<byte[]> {
-
-  /**
-   * Constructor.
-   */
-  public BytesCoder() {
-  }
-
-  @Override
-  public void encode(final byte[] value, final OutputStream outStream) throws IOException {
-    try (final DataOutputStream dataOutputStream = new DataOutputStream(outStream)) {
-      dataOutputStream.writeInt(value.length); // Write the size of this byte array.
-      dataOutputStream.write(value);
-    }
-  }
-
-  @Override
-  public byte[] decode(final InputStream inStream) throws IOException {
-    // If the inStream is closed well in upper level, it is okay to not close this stream
-    // because the DataInputStream itself will not contain any extra information.
-    // (when we close this stream, the inStream will be closed together.)
-    final DataInputStream dataInputStream = new DataInputStream(inStream);
-    final int bytesToRead = dataInputStream.readInt();
-    final byte[] bytes = new byte[bytesToRead]; // Read the size of this byte array.
-    final int readBytes = dataInputStream.read(bytes);
-    if (bytesToRead != readBytes) {
-      throw new IOException("Have to read " + bytesToRead + " but read only " + readBytes + " bytes.");
-    }
-    return bytes;
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
deleted file mode 100644
index 49c56d1..0000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A coder object encodes or decodes values of type {@code T} into byte streams.
- *
- * @param <T> element type.
- */
-public interface Coder<T> extends Serializable {
-  /**
-   * Encodes the given value onto the specified output stream.
-   * It have to be able to encode the given stream consequently by calling this method repeatedly.
-   * Because the user can want to keep a single output stream and continuously concatenate elements,
-   * the output stream should not be closed.
-   *
-   * @param element   the element to be encoded
-   * @param outStream the stream on which encoded bytes are written
-   * @throws IOException if fail to encode
-   */
-  void encode(T element, OutputStream outStream) throws IOException;
-
-  /**
-   * Decodes the a value from the given input stream.
-   * It have to be able to decode the given stream consequently by calling this method repeatedly.
-   * Because there are many elements in the input stream, the stream should not be closed.
-   *
-   * @param inStream the stream from which bytes are read
-   * @return the decoded element
-   * @throws IOException if fail to decode
-   */
-  T decode(InputStream inStream) throws IOException;
-
-  /**
-   * Dummy coder.
-   */
-  Coder DUMMY_CODER = new DummyCoder();
-
-  /**
-   * Dummy coder implementation which is not supposed to be used.
-   */
-  final class DummyCoder implements Coder {
-
-    @Override
-    public void encode(final Object value, final OutputStream outStream) {
-      throw new RuntimeException("DummyCoder is not supposed to be used.");
-    }
-
-    @Override
-    public Object decode(final InputStream inStream) {
-      throw new RuntimeException("DummyCoder is not supposed to be used.");
-    }
-
-    @Override
-    public String toString() {
-      return "DUMMY_CODER";
-    }
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
new file mode 100644
index 0000000..a93d843
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A decoder factory object which generates decoders that decode values of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for every decoding,
+ * user need to instantiate a decoder instance and use it.
+ *
+ * @param <T> element type.
+ */
+public interface DecoderFactory<T> extends Serializable {
+
+  /**
+   * Get a decoder instance.
+   *
+   * @param inputStream the input stream to decode.
+   * @return the decoder instance.
+   * @throws IOException if fail to get the instance.
+   */
+  Decoder<T> create(InputStream inputStream) throws IOException;
+
+  /**
+   * Interface of Decoder.
+   *
+   * @param <T> element type.
+   */
+  interface Decoder<T> extends Serializable {
+
+    /**
+     * Decodes the a value from the given input stream.
+     * It have to be able to decode the given stream consequently by calling this method repeatedly.
+     * Because there are many elements in the input stream, the stream should not be closed.
+     *
+     * @return the decoded element
+     * @throws IOException if fail to decode
+     */
+    T decode() throws IOException;
+  }
+
+  /**
+   * Dummy coder factory.
+   */
+  DecoderFactory DUMMY_DECODER_FACTORY = new DummyDecoderFactory();
+
+  /**
+   * Dummy coder factory implementation which is not supposed to be used.
+   */
+  final class DummyDecoderFactory implements DecoderFactory {
+
+    private final Decoder dummyDecoder = new DummyDecoder();
+
+    /**
+     * DummyDecoder.
+     */
+    private final class DummyDecoder implements Decoder {
+
+      @Override
+      public Object decode() {
+        throw new RuntimeException("DummyDecoder is not supposed to be used.");
+      }
+    }
+
+    @Override
+    public Decoder create(final InputStream inputStream) {
+      return dummyDecoder;
+    }
+
+    @Override
+    public String toString() {
+      return "DUMMY_DECODER_FACTORY";
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
new file mode 100644
index 0000000..d63fafb
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A encoder factory object which generates encoders that encode values of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for every encoding,
+ * user need to explicitly instantiate an encoder instance and use it.
+ *
+ * @param <T> element type.
+ */
+public interface EncoderFactory<T> extends Serializable {
+
+  /**
+   * Get an encoder instance.
+   *
+   * @param outputStream the stream on which encoded bytes are written
+   * @return the encoder instance.
+   * @throws IOException if fail to get the instance.
+   */
+  Encoder<T> create(OutputStream outputStream) throws IOException;
+
+  /**
+   * Interface of Encoder.
+   *
+   * @param <T> element type.
+   */
+  interface Encoder<T> extends Serializable {
+
+    /**
+     * Encodes the given value onto the specified output stream.
+     * It have to be able to encode the given stream consequently by calling this method repeatedly.
+     * Because the user can want to keep a single output stream and continuously concatenate elements,
+     * the output stream should not be closed.
+     *
+     * @param element the element to be encoded
+     * @throws IOException if fail to encode
+     */
+    void encode(T element) throws IOException;
+  }
+
+  /**
+   * Dummy encoder factory.
+   */
+  EncoderFactory DUMMY_ENCODER_FACTORY = new DummyEncoderFactory();
+
+  /**
+   * Dummy encoder factory implementation which is not supposed to be used.
+   */
+  final class DummyEncoderFactory implements EncoderFactory {
+
+    private final Encoder dummyEncoder = new DummyEncoder();
+
+    /**
+     * DummyEncoder.
+     */
+    private final class DummyEncoder implements Encoder {
+
+      @Override
+      public void encode(final Object element) {
+        throw new RuntimeException("DummyEncoder is not supposed to be used.");
+      }
+    }
+
+    @Override
+    public Encoder create(final OutputStream outputStream) {
+      return dummyEncoder;
+    }
+
+    @Override
+    public String toString() {
+      return "DUMMY_ENCODER_FACTORY";
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
deleted file mode 100644
index face994..0000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.*;
-
-/**
- * A {@link Coder} which is used for an integer.
- */
-public final class IntCoder implements Coder<Integer> {
-
-  /**
-   * A private constructor.
-   */
-  private IntCoder() {
-  }
-
-  /**
-   * Static initializer of the coder.
-   */
-  public static IntCoder of() {
-    return new IntCoder();
-  }
-
-  @Override
-  public void encode(final Integer value, final OutputStream outStream) throws IOException {
-    final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
-    dataOutputStream.writeInt(value);
-  }
-
-  @Override
-  public Integer decode(final InputStream inStream) throws IOException {
-    // If the inStream is closed well in upper level, it is okay to not close this stream
-    // because the DataInputStream itself will not contain any extra information.
-    // (when we close this stream, the inStream will be closed together.)
-    final DataInputStream dataInputStream = new DataInputStream(inStream);
-    return dataInputStream.readInt();
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java
new file mode 100644
index 0000000..d79ec35
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link DecoderFactory} which is used for an integer.
+ */
+public final class IntDecoderFactory implements DecoderFactory<Integer> {
+
+  private static final IntDecoderFactory INT_DECODER_FACTORY = new IntDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private IntDecoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static IntDecoderFactory of() {
+    return INT_DECODER_FACTORY;
+  }
+
+  @Override
+  public Decoder<Integer> create(final InputStream inputStream) {
+    return new IntDecoder(inputStream);
+  }
+
+  /**
+   * IntDecoder.
+   */
+  private final class IntDecoder implements Decoder<Integer> {
+
+    private final DataInputStream inputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream  the input stream to decode.
+     */
+    private IntDecoder(final InputStream inputStream) {
+      // If the inputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataInputStream itself will not contain any extra information.
+      // (when we close this stream, the input will be closed together.)
+      this.inputStream = new DataInputStream(inputStream);
+    }
+
+    @Override
+    public Integer decode() throws IOException {
+      return inputStream.readInt();
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java
new file mode 100644
index 0000000..0a40de5
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link EncoderFactory} which is used for an integer.
+ */
+public final class IntEncoderFactory implements EncoderFactory<Integer> {
+
+  private static final IntEncoderFactory INT_ENCODER_FACTORY = new IntEncoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private IntEncoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static IntEncoderFactory of() {
+    return INT_ENCODER_FACTORY;
+  }
+
+  @Override
+  public Encoder<Integer> create(final OutputStream outputStream) {
+    return new IntEncoder(outputStream);
+  }
+
+  /**
+   * IntEncoder.
+   */
+  private final class IntEncoder implements Encoder<Integer> {
+
+    private final DataOutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     */
+    private IntEncoder(final OutputStream outputStream) {
+      // If the outputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataOutputStream itself will not contain any extra information.
+      // (when we close this stream, the output will be closed together.)
+      this.outputStream = new DataOutputStream(outputStream);
+    }
+
+    @Override
+    public void encode(final Integer value) throws IOException {
+      outputStream.writeInt(value);
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
deleted file mode 100644
index 3ad3552..0000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import edu.snu.nemo.common.Pair;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
- * @param <A> type for the left coder.
- * @param <B> type for the right coder.
- */
-public final class PairCoder<A, B> implements Coder<Pair<A, B>> {
-  private final Coder<A> leftCoder;
-  private final Coder<B> rightCoder;
-
-  /**
-   * Private constructor of PairCoder class.
-   * @param leftCoder coder for right element.
-   * @param rightCoder coder for right element.
-   */
-  private PairCoder(final Coder<A> leftCoder, final Coder<B> rightCoder) {
-    this.leftCoder = leftCoder;
-    this.rightCoder = rightCoder;
-  }
-
-  /**
-   * static initializer of the class.
-   * @param leftCoder left coder.
-   * @param rightCoder right coder.
-   * @param <A> type of the left element.
-   * @param <B> type of the right element.
-   * @return the new PairCoder.
-   */
-  public static <A, B> PairCoder<A, B> of(final Coder<A> leftCoder, final Coder<B> rightCoder) {
-    return new PairCoder<>(leftCoder, rightCoder);
-  }
-
-  /**
-   * @return the left coder.
-   */
-  Coder<A> getLeftCoder() {
-    return leftCoder;
-  }
-
-  /**
-   * @return the right coder.
-   */
-  Coder<B> getRightCoder() {
-    return rightCoder;
-  }
-
-  @Override
-  public void encode(final Pair<A, B> pair, final OutputStream outStream) throws IOException {
-    if (pair == null) {
-      throw new IOException("cannot encode a null pair");
-    }
-    leftCoder.encode(pair.left(), outStream);
-    rightCoder.encode(pair.right(), outStream);
-  }
-
-  @Override
-  public Pair<A, B> decode(final InputStream inStream) throws IOException {
-    final A key = leftCoder.decode(inStream);
-    final B value = rightCoder.decode(inStream);
-    return Pair.of(key, value);
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java
new file mode 100644
index 0000000..9cd0fb1
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import edu.snu.nemo.common.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An DecoderFactory for {@link Pair}. Reference: KvCoder in BEAM.
+ * @param <A> type for the left coder.
+ * @param <B> type for the right coder.
+ */
+public final class PairDecoderFactory<A, B> implements DecoderFactory<Pair<A, B>> {
+  private final DecoderFactory<A> leftDecoderFactory;
+  private final DecoderFactory<B> rightDecoderFactory;
+
+  /**
+   * Private constructor of PairDecoderFactory class.
+   *
+   * @param leftDecoderFactory  coder for right element.
+   * @param rightDecoderFactory coder for right element.
+   */
+  private PairDecoderFactory(final DecoderFactory<A> leftDecoderFactory,
+                             final DecoderFactory<B> rightDecoderFactory) {
+    this.leftDecoderFactory = leftDecoderFactory;
+    this.rightDecoderFactory = rightDecoderFactory;
+  }
+
+  /**
+   * static initializer of the class.
+   *
+   * @param leftDecoderFactory  left coder.
+   * @param rightDecoderFactory right coder.
+   * @param <A>          type of the left element.
+   * @param <B>          type of the right element.
+   * @return the new PairDecoderFactory.
+   */
+  public static <A, B> PairDecoderFactory<A, B> of(final DecoderFactory<A> leftDecoderFactory,
+                                                   final DecoderFactory<B> rightDecoderFactory) {
+    return new PairDecoderFactory<>(leftDecoderFactory, rightDecoderFactory);
+  }
+
+  @Override
+  public Decoder<Pair<A, B>> create(final InputStream inputStream) throws IOException {
+    return new PairDecoder<>(inputStream, leftDecoderFactory, rightDecoderFactory);
+  }
+
+  /**
+   * PairDecoder.
+   * @param <T1> type for the left coder.
+   * @param <T2> type for the right coder.
+   */
+  private final class PairDecoder<T1, T2> implements Decoder<Pair<T1, T2>> {
+
+    private final Decoder<T1> leftDecoder;
+    private final Decoder<T2> rightDecoder;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream  the input stream to decode.
+     * @param leftDecoderFactory  the actual decoder to use for left elements.
+     * @param rightDecoderFactory the actual decoder to use for right elements.
+     * @throws IOException if fail to instantiate coders.
+     */
+    private PairDecoder(final InputStream inputStream,
+                        final DecoderFactory<T1> leftDecoderFactory,
+                        final DecoderFactory<T2> rightDecoderFactory) throws IOException {
+      this.leftDecoder = leftDecoderFactory.create(inputStream);
+      this.rightDecoder = rightDecoderFactory.create(inputStream);
+    }
+
+    @Override
+    public Pair<T1, T2> decode() throws IOException {
+      final T1 key = leftDecoder.decode();
+      final T2 value = rightDecoder.decode();
+      return Pair.of(key, value);
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java
new file mode 100644
index 0000000..0837b57
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import edu.snu.nemo.common.Pair;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An EncoderFactory for {@link Pair}. Reference: KvCoder in BEAM.
+ * @param <A> type for the left coder.
+ * @param <B> type for the right coder.
+ */
+public final class PairEncoderFactory<A, B> implements EncoderFactory<Pair<A, B>> {
+  private final EncoderFactory<A> leftEncoderFactory;
+  private final EncoderFactory<B> rightEncoderFactory;
+
+  /**
+   * Private constructor of PairEncoderFactory class.
+   *
+   * @param leftEncoderFactory  coder for right element.
+   * @param rightEncoderFactory coder for right element.
+   */
+  private PairEncoderFactory(final EncoderFactory<A> leftEncoderFactory,
+                             final EncoderFactory<B> rightEncoderFactory) {
+    this.leftEncoderFactory = leftEncoderFactory;
+    this.rightEncoderFactory = rightEncoderFactory;
+  }
+
+  /**
+   * static initializer of the class.
+   *
+   * @param leftEncoderFactory  left coder.
+   * @param rightEncoderFactory right coder.
+   * @param <A>          type of the left element.
+   * @param <B>          type of the right element.
+   * @return the new PairEncoderFactory.
+   */
+  public static <A, B> PairEncoderFactory<A, B> of(final EncoderFactory<A> leftEncoderFactory,
+                                                   final EncoderFactory<B> rightEncoderFactory) {
+    return new PairEncoderFactory<>(leftEncoderFactory, rightEncoderFactory);
+  }
+
+  @Override
+  public Encoder<Pair<A, B>> create(final OutputStream outputStream) throws IOException {
+    return new PairEncoder<>(outputStream, leftEncoderFactory, rightEncoderFactory);
+  }
+
+  /**
+   * PairEncoder.
+   * @param <T1> type for the left coder.
+   * @param <T2> type for the right coder.
+   */
+  private final class PairEncoder<T1, T2> implements Encoder<Pair<T1, T2>> {
+
+    private final Encoder<T1> leftEncoder;
+    private final Encoder<T2> rightEncoder;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     * @param leftEncoderFactory  the actual encoder to use for left elements.
+     * @param rightEncoderFactory the actual encoder to use for right elements.
+     * @throws IOException if fail to instantiate coders.
+     */
+    private PairEncoder(final OutputStream outputStream,
+                        final EncoderFactory<T1> leftEncoderFactory,
+                        final EncoderFactory<T2> rightEncoderFactory) throws IOException {
+      this.leftEncoder = leftEncoderFactory.create(outputStream);
+      this.rightEncoder = rightEncoderFactory.create(outputStream);
+    }
+
+    @Override
+    public void encode(final Pair<T1, T2> pair) throws IOException {
+      if (pair == null) {
+        throw new IOException("cannot encode a null pair");
+      }
+      leftEncoder.encode(pair.left());
+      rightEncoder.encode(pair.right());
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index 6402319..441949e 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -23,6 +23,7 @@ import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 public final class CompressionProperty extends EdgeExecutionProperty<CompressionProperty.Value> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private CompressionProperty(final Value value) {
@@ -31,6 +32,7 @@ public final class CompressionProperty extends EdgeExecutionProperty<Compression
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
similarity index 76%
copy from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
copy to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
index aae1c6e..7ef507a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
- * Coder ExecutionProperty.
+ * Decoder ExecutionProperty.
  */
-public final class CoderProperty extends EdgeExecutionProperty<Coder> {
+public final class DecoderProperty extends EdgeExecutionProperty<DecoderFactory> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
-  private CoderProperty(final Coder value) {
+  private DecoderProperty(final DecoderFactory value) {
     super(value);
   }
 
@@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty<Coder> {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static CoderProperty of(final Coder value) {
-    return new CoderProperty(value);
+  public static DecoderProperty of(final DecoderFactory value) {
+    return new DecoderProperty(value);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java
similarity index 72%
copy from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
copy to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java
index aae1c6e..bfd09e0 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
- * Coder ExecutionProperty.
+ * Decompression ExecutionProperty.
+ * It shares the value with {@link CompressionProperty}.
  */
-public final class CoderProperty extends EdgeExecutionProperty<Coder> {
+public final class DecompressionProperty extends EdgeExecutionProperty<CompressionProperty.Value> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
-  private CoderProperty(final Coder value) {
+  private DecompressionProperty(final CompressionProperty.Value value) {
     super(value);
   }
 
@@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty<Coder> {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static CoderProperty of(final Coder value) {
-    return new CoderProperty(value);
+  public static DecompressionProperty of(final CompressionProperty.Value value) {
+    return new DecompressionProperty(value);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java
similarity index 75%
rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java
index aae1c6e..f214f17 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
- * Coder ExecutionProperty.
+ * EncoderFactory ExecutionProperty.
  */
-public final class CoderProperty extends EdgeExecutionProperty<Coder> {
+public final class EncoderProperty extends EdgeExecutionProperty<EncoderFactory> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
-  private CoderProperty(final Coder value) {
+  private EncoderProperty(final EncoderFactory value) {
     super(value);
   }
 
@@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty<Coder> {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static CoderProperty of(final Coder value) {
-    return new CoderProperty(value);
+  public static EncoderProperty of(final EncoderFactory value) {
+    return new EncoderProperty(value);
   }
 }
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
index 4650aae..558a9f7 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.common.ir;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index f7eaad7..1ca18ff 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -15,15 +15,10 @@
  */
 package edu.snu.nemo.common.ir.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -66,8 +61,10 @@ public class ExecutionPropertyMapTest {
     assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
-    edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
-    assertEquals(Coder.DUMMY_CODER, edgeMap.get(CoderProperty.class).get());
+    edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
+    assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, edgeMap.get(EncoderProperty.class).get());
+    edgeMap.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
+    assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, edgeMap.get(DecoderProperty.class).get());
 
     edgeMap.remove(DataFlowModelProperty.class);
     assertFalse(edgeMap.get(DataFlowModelProperty.class).isPresent());
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index bf3253b..b85057b 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,9 +15,12 @@
  */
 package edu.snu.nemo.compiler.frontend.beam;
 
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
+import edu.snu.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
+import edu.snu.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -55,10 +58,11 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
   private final PipelineOptions options;
   // loopVertexStack keeps track of where the beam program is: whether it is inside a composite transform or it is not.
   private final Stack<LoopVertex> loopVertexStack;
-  private final Map<PValue, BeamCoder> pValueToCoder;
+  private final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder;
 
   /**
    * Constructor of the BEAM Visitor.
+   *
    * @param builder DAGBuilder to build the DAG with.
    * @param options Pipeline options.
    */
@@ -96,10 +100,11 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
       throw new UnsupportedOperationException(beamNode.toString());
     }
 
-    final IRVertex irVertex = convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options,
-        loopVertexStack);
+    final IRVertex irVertex =
+        convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options, loopVertexStack);
     beamNode.getOutputs().values().stream().filter(v -> v instanceof PCollection).map(v -> (PCollection) v)
-        .forEach(output -> pValueToCoder.put(output, new BeamCoder(output.getCoder())));
+        .forEach(output -> pValueToCoder.put(output,
+            Pair.of(new BeamEncoderFactory(output.getCoder()), new BeamDecoderFactory(output.getCoder()))));
 
     beamNode.getOutputs().values().forEach(output -> pValueToVertex.put(output, irVertex));
 
@@ -107,7 +112,9 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex);
-          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
+          final Pair<BeamEncoderFactory, BeamDecoderFactory> coderPair = pValueToCoder.get(pValue);
+          edge.setProperty(EncoderProperty.of(coderPair.left()));
+          edge.setProperty(DecoderProperty.of(coderPair.right()));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           this.builder.connectVertices(edge);
         });
@@ -115,22 +122,24 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
 
   /**
    * Convert Beam node to IR vertex.
-   * @param beamNode input beam node.
-   * @param builder the DAG builder to add the vertex to.
-   * @param pValueToVertex PValue to Vertex map.
-   * @param pValueToCoder PValue to Coder map.
-   * @param options pipeline options.
+   *
+   * @param beamNode        input beam node.
+   * @param builder         the DAG builder to add the vertex to.
+   * @param pValueToVertex  PValue to Vertex map.
+   * @param pValueToCoder   PValue to EncoderFactory and DecoderFactory map.
+   * @param options         pipeline options.
    * @param loopVertexStack Stack to get the current loop vertex that the operator vertex will be assigned to.
-   * @param <I> input type.
-   * @param <O> output type.
+   * @param <I>             input type.
+   * @param <O>             output type.
    * @return newly created vertex.
    */
-  private static <I, O> IRVertex convertToVertex(final TransformHierarchy.Node beamNode,
-                                                 final DAGBuilder<IRVertex, IREdge> builder,
-                                                 final Map<PValue, IRVertex> pValueToVertex,
-                                                 final Map<PValue, BeamCoder> pValueToCoder,
-                                                 final PipelineOptions options,
-                                                 final Stack<LoopVertex> loopVertexStack) {
+  private static <I, O> IRVertex
+  convertToVertex(final TransformHierarchy.Node beamNode,
+                  final DAGBuilder<IRVertex, IREdge> builder,
+                  final Map<PValue, IRVertex> pValueToVertex,
+                  final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder,
+                  final PipelineOptions options,
+                  final Stack<LoopVertex> loopVertexStack) {
     final PTransform beamTransform = beamNode.getTransform();
     final IRVertex irVertex;
     if (beamTransform instanceof Read.Bounded) {
@@ -147,13 +156,14 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
       pValueToVertex.put(view.getView(), irVertex);
       builder.addVertex(irVertex, loopVertexStack);
       // Coders for outgoing edges in CreateViewTransform.
-      // Since outgoing PValues for CreateViewTransform is PCollectionView, we cannot use PCollection::getCoder to
-      // obtain coders.
+      // Since outgoing PValues for CreateViewTransform is PCollectionView,
+      // we cannot use PCollection::getEncoderFactory to obtain coders.
       final Coder beamInputCoder = beamNode.getInputs().values().stream()
           .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
           .orElseThrow(() -> new RuntimeException("No inputs provided to " + beamNode.getFullName())).getCoder();
       beamNode.getOutputs().values().stream()
-          .forEach(output -> pValueToCoder.put(output, getCoderForView(view.getView().getViewFn(), beamInputCoder)));
+          .forEach(output ->
+              pValueToCoder.put(output, getCoderPairForView(view.getView().getViewFn(), beamInputCoder)));
     } else if (beamTransform instanceof Window) {
       final Window<I> window = (Window<I>) beamTransform;
       final WindowTransform transform = new WindowTransform(window.getWindowFn());
@@ -187,35 +197,40 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
 
   /**
    * Connect side inputs to the vertex.
-   * @param builder the DAG builder to add the vertex to.
-   * @param sideInputs side inputs.
+   *
+   * @param builder        the DAG builder to add the vertex to.
+   * @param sideInputs     side inputs.
    * @param pValueToVertex PValue to Vertex map.
-   * @param pValueToCoder  PValue to Coder map.
-   * @param irVertex wrapper for a user operation in the IR. (Where the side input is headed to)
+   * @param pValueToCoder  PValue to Encoder/Decoder factory map.
+   * @param irVertex       wrapper for a user operation in the IR. (Where the side input is headed to)
    */
   private static void connectSideInputs(final DAGBuilder<IRVertex, IREdge> builder,
                                         final List<PCollectionView<?>> sideInputs,
                                         final Map<PValue, IRVertex> pValueToVertex,
-                                        final Map<PValue, BeamCoder> pValueToCoder,
+                                        final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder,
                                         final IRVertex irVertex) {
     sideInputs.stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex),
               src, irVertex, true);
-          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
+          final Pair<BeamEncoderFactory, BeamDecoderFactory> coder = pValueToCoder.get(pValue);
+          edge.setProperty(EncoderProperty.of(coder.left()));
+          edge.setProperty(DecoderProperty.of(coder.right()));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           builder.connectVertices(edge);
         });
   }
 
   /**
-   * Get appropriate coder for {@link PCollectionView}.
-   * @param viewFn {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform
+   * Get appropriate encoder and decoder pair for {@link PCollectionView}.
+   *
+   * @param viewFn         {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform
    * @param beamInputCoder Beam {@link Coder} for input value to {@link View.CreatePCollectionView}
-   * @return appropriate {@link BeamCoder}
+   * @return appropriate pair of {@link BeamEncoderFactory} and {@link BeamDecoderFactory}
    */
-  private static BeamCoder getCoderForView(final ViewFn viewFn, final Coder beamInputCoder) {
+  private static Pair<BeamEncoderFactory, BeamDecoderFactory> getCoderPairForView(final ViewFn viewFn,
+                                                                                  final Coder beamInputCoder) {
     final Coder beamOutputCoder;
     if (viewFn instanceof PCollectionViews.IterableViewFn) {
       beamOutputCoder = IterableCoder.of(beamInputCoder);
@@ -232,11 +247,12 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
     } else {
       throw new UnsupportedOperationException("Unsupported viewFn: " + viewFn.getClass());
     }
-    return new BeamCoder(beamOutputCoder);
+    return Pair.of(new BeamEncoderFactory(beamOutputCoder), new BeamDecoderFactory(beamOutputCoder));
   }
 
   /**
    * Get the edge type for the src, dst vertex.
+   *
    * @param src source vertex.
    * @param dst destination vertex.
    * @return the appropriate edge type.
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java
deleted file mode 100644
index 56f6714..0000000
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.beam.coder;
-
-import edu.snu.nemo.common.coder.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VoidCoder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * {@link Coder} from {@link org.apache.beam.sdk.coders.Coder}.
- * @param <T> element type.
- */
-public final class BeamCoder<T> implements Coder<T> {
-  private final org.apache.beam.sdk.coders.Coder<T> beamCoder;
-
-  /**
-   * Constructor of BeamCoder.
-   * @param beamCoder actual Beam coder to use.
-   */
-  public BeamCoder(final org.apache.beam.sdk.coders.Coder<T> beamCoder) {
-    this.beamCoder = beamCoder;
-  }
-
-  @Override
-  public void encode(final T value, final OutputStream outStream) throws IOException {
-    if (beamCoder instanceof VoidCoder) {
-      outStream.write(0);
-      return;
-    }
-    try {
-      beamCoder.encode(value, outStream);
-    } catch (final CoderException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public T decode(final InputStream inStream) throws IOException {
-    if (beamCoder instanceof VoidCoder && inStream.read() == -1) {
-      throw new IOException("End of stream reached");
-    }
-    try {
-      return beamCoder.decode(inStream);
-    } catch (final CoderException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return beamCoder.toString();
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
new file mode 100644
index 0000000..7ebea38
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
+ * @param <T> the type of element to encode.
+ */
+public final class BeamDecoderFactory<T> implements DecoderFactory<T> {
+  private final Coder<T> beamCoder;
+
+  /**
+   * Constructor of BeamDecoderFactory.
+   *
+   * @param beamCoder actual Beam coder to use.
+   */
+  public BeamDecoderFactory(final Coder<T> beamCoder) {
+    this.beamCoder = beamCoder;
+  }
+
+  @Override
+  public Decoder<T> create(final InputStream inputStream) {
+    if (beamCoder instanceof VoidCoder) {
+      return new BeamVoidDecoder<>(inputStream, beamCoder);
+    } else {
+      return new BeamDecoder<>(inputStream, beamCoder);
+    }
+  }
+
+  /**
+   * Abstract class for Beam Decoder.
+   * @param <T2> the type of element to decode.
+   */
+  private abstract class BeamAbstractDecoder<T2> implements Decoder<T2> {
+
+    private final Coder<T2> beamCoder;
+    private final InputStream inputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    protected BeamAbstractDecoder(final InputStream inputStream,
+                                  final Coder<T2> beamCoder) {
+      this.inputStream = inputStream;
+      this.beamCoder = beamCoder;
+    }
+
+    /**
+     * Decode the actual data internally.
+     *
+     * @return the decoded data.
+     * @throws IOException if fail to decode.
+     */
+    protected T2 decodeInternal() throws IOException {
+      try {
+        return beamCoder.decode(inputStream);
+      } catch (final CoderException e) {
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * @return the input stream.
+     */
+    protected InputStream getInputStream() {
+      return inputStream;
+    }
+  }
+
+  /**
+   * Beam Decoder for non void objects.
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamDecoder<T2> extends BeamAbstractDecoder<T2> {
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    private BeamDecoder(final InputStream inputStream,
+                        final Coder<T2> beamCoder) {
+      super(inputStream, beamCoder);
+    }
+
+    @Override
+    public T2 decode() throws IOException {
+      return decodeInternal();
+    }
+  }
+
+  /**
+   * Beam Decoder for {@link VoidCoder}.
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamVoidDecoder<T2> extends BeamAbstractDecoder<T2> {
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    private BeamVoidDecoder(final InputStream inputStream,
+                            final Coder<T2> beamCoder) {
+      super(inputStream, beamCoder);
+    }
+
+    @Override
+    public T2 decode() throws IOException {
+      if (getInputStream().read() == -1) {
+        throw new IOException("End of stream reached");
+      }
+      return decodeInternal();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return beamCoder.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
new file mode 100644
index 0000000..f67b96c
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@link EncoderFactory} from {@link Coder}.
+ * @param <T> the type of element to encode.
+ */
+public final class BeamEncoderFactory<T> implements EncoderFactory<T> {
+
+  private final Coder<T> beamCoder;
+
+  /**
+   * Constructor of BeamEncoderFactory.
+   *
+   * @param beamCoder actual Beam coder to use.
+   */
+  public BeamEncoderFactory(final Coder<T> beamCoder) {
+    this.beamCoder = beamCoder;
+  }
+
+  @Override
+  public Encoder<T> create(final OutputStream outputStream) {
+    if (beamCoder instanceof VoidCoder) {
+      return new BeamVoidEncoder<>(outputStream);
+    } else {
+      return new BeamEncoder<>(outputStream, beamCoder);
+    }
+  }
+
+  /**
+   * Beam Encoder for non void objects.
+   *
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamEncoder<T2> implements Encoder<T2> {
+
+    private final Coder<T2> beamCoder;
+    private final OutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     * @param beamCoder    the actual beam coder to use.
+     */
+    private BeamEncoder(final OutputStream outputStream,
+                        final Coder<T2> beamCoder) {
+      this.outputStream = outputStream;
+      this.beamCoder = beamCoder;
+    }
+
+    @Override
+    public void encode(final T2 element) throws IOException {
+      try {
+        beamCoder.encode(element, outputStream);
+      } catch (final CoderException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Beam Decoder for {@link VoidCoder}.
+   *
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamVoidEncoder<T2> implements Encoder<T2> {
+
+    private final OutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     */
+    private BeamVoidEncoder(final OutputStream outputStream) {
+      this.outputStream = outputStream;
+    }
+
+    @Override
+    public void encode(final T2 element) throws IOException {
+      outputStream.write(0); // emit 0 instead of null to enable to count emitted elements.
+    }
+  }
+
+  @Override
+  public String toString() {
+    return beamCoder.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
index 9c63892..fa5d380 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
@@ -20,7 +20,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import java.io.*;
 
 /**
- * Coder for float[].
+ * EncoderFactory for float[].
  */
 public final class FloatArrayCoder extends AtomicCoder<float[]> {
   /**
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
index 80f53c0..c180bd6 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
@@ -20,7 +20,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import java.io.*;
 
 /**
- * Coder for int[].
+ * EncoderFactory for int[].
  */
 public final class IntArrayCoder extends AtomicCoder<int[]> {
   /**
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java
deleted file mode 100644
index 6317326..0000000
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.spark.coder;
-
-import edu.snu.nemo.common.coder.Coder;
-import org.apache.spark.serializer.Serializer;
-import scala.reflect.ClassTag$;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Kryo Spark Coder for serialization.
- * @param <T> type of the object to (de)serialize.
- */
-public final class SparkCoder<T> implements Coder<T> {
-  private final Serializer serializer;
-
-  /**
-   * Default constructor.
-   * @param serializer kryo serializer.
-   */
-  public SparkCoder(final Serializer serializer) {
-    this.serializer = serializer;
-  }
-
-  @Override
-  public void encode(final T element, final OutputStream outStream) throws IOException {
-    serializer.newInstance().serializeStream(outStream).writeObject(element, ClassTag$.MODULE$.Any());
-  }
-
-  @Override
-  public T decode(final InputStream inStream) throws IOException {
-    final T obj = (T) serializer.newInstance().deserializeStream(inStream).readObject(ClassTag$.MODULE$.Any());
-    return obj;
-  }
-}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
new file mode 100644
index 0000000..605b766
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import scala.reflect.ClassTag$;
+
+import java.io.InputStream;
+
+/**
+ * Spark DecoderFactory for serialization.
+ * @param <T> type of the object to deserialize.
+ */
+public final class SparkDecoderFactory<T> implements DecoderFactory<T> {
+  private final Serializer serializer;
+
+  /**
+   * Default constructor.
+   *
+   * @param serializer Spark serializer.
+   */
+  public SparkDecoderFactory(final Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public Decoder<T> create(final InputStream inputStream) {
+    return new SparkDecoder<>(inputStream, serializer.newInstance());
+  }
+
+  /**
+   * SparkDecoder.
+   * @param <T2> type of the object to deserialize.
+   */
+  private final class SparkDecoder<T2> implements Decoder<T2> {
+
+    private final DeserializationStream in;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream             the input stream to decode.
+     * @param sparkSerializerInstance the actual spark serializer instance to use.
+     */
+    private SparkDecoder(final InputStream inputStream,
+                         final SerializerInstance sparkSerializerInstance) {
+      this.in = sparkSerializerInstance.deserializeStream(inputStream);
+    }
+
+    @Override
+    public T2 decode() {
+      return (T2) in.readObject(ClassTag$.MODULE$.Any());
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
new file mode 100644
index 0000000..970a46c
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.coder;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import scala.reflect.ClassTag$;
+
+import java.io.OutputStream;
+
+/**
+ * Spark EncoderFactory for serialization.
+ * @param <T> type of the object to serialize.
+ */
+public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
+  private final Serializer serializer;
+
+  /**
+   * Default constructor.
+   *
+   * @param serializer Spark serializer.
+   */
+  public SparkEncoderFactory(final Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public Encoder<T> create(final OutputStream outputStream) {
+    return new SparkEncoder<>(outputStream, serializer.newInstance());
+  }
+
+  /**
+   * SparkEncoder.
+   * @param <T2> type of the object to serialize.
+   */
+  private final class SparkEncoder<T2> implements Encoder<T2> {
+
+    private final SerializationStream out;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream            the output stream to store the encoded bytes.
+     * @param sparkSerializerInstance the actual spark serializer instance to use.
+     */
+    private SparkEncoder(final OutputStream outputStream,
+                         final SerializerInstance sparkSerializerInstance) {
+      this.out = sparkSerializerInstance.serializeStream(outputStream);
+    }
+
+    @Override
+    public void encode(final T2 element) {
+      out.writeObject(element, ClassTag$.MODULE$.Any());
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 6c4c395..08731a4 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -19,14 +19,16 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkDecoderFactory;
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkEncoderFactory;
 import edu.snu.nemo.compiler.frontend.spark.transform.CollectTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.GroupByKeyTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
@@ -87,8 +89,10 @@ public final class SparkFrontendUtils {
    * @param <T>             type of the return data.
    * @return the data collected.
    */
-  public static <T> List<T> collect(final DAG<IRVertex, IREdge> dag, final Stack<LoopVertex> loopVertexStack,
-                                    final IRVertex lastVertex, final Serializer serializer) {
+  public static <T> List<T> collect(final DAG<IRVertex, IREdge> dag,
+                                    final Stack<LoopVertex> loopVertexStack,
+                                    final IRVertex lastVertex,
+                                    final Serializer serializer) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
 
     // save result in a temporary file
@@ -100,7 +104,8 @@ public final class SparkFrontendUtils {
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
         lastVertex, collectVertex);
-    newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer)));
+    newEdge.setProperty(EncoderProperty.of(new SparkEncoderFactory(serializer)));
+    newEdge.setProperty(DecoderProperty.of(new SparkDecoderFactory(serializer)));
     newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
     builder.connectVertices(newEdge);
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index 5065f9c..14f764a 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,11 +19,11 @@ import java.util
 
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
 import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform
 import org.apache.hadoop.conf.Configuration
@@ -72,7 +72,10 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
       self.lastVertex, reduceByKeyVertex)
     newEdge.setProperty(
-      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer))
+      EncoderProperty.of(new SparkEncoderFactory[Tuple2[K, V]](self.serializer))
+        .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
+    newEdge.setProperty(
+      DecoderProperty.of(new SparkDecoderFactory[Tuple2[K, V]](self.serializer))
         .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 135f698..ad58399 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -20,11 +20,11 @@ import java.util
 import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
 import edu.snu.nemo.compiler.frontend.spark.transform._
 import org.apache.hadoop.io.WritableFactory
@@ -51,8 +51,10 @@ final class RDD[T: ClassTag] protected[rdd] (
 
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
   private val loopVertexStack = new util.Stack[LoopVertex]
-  private val coderProperty: EdgeExecutionProperty[_ <: Serializable] =
-    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
+  private val encoderProperty: EdgeExecutionProperty[_ <: Serializable] =
+    EncoderProperty.of(new SparkEncoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
+  private val decoderProperty: EdgeExecutionProperty[_ <: Serializable] =
+    DecoderProperty.of(new SparkDecoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
   private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
 
   /**
@@ -137,7 +139,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
       lastVertex, mapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
@@ -156,7 +159,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
       lastVertex, flatMapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
@@ -186,7 +190,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
       lastVertex, reduceVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
@@ -210,7 +215,8 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(flatMapVertex, loopVertexStack)
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
       lastVertex, flatMapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 1d1a651..0ffddaa 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -50,6 +50,7 @@ public final class CompressionPass extends AnnotatingPass {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
         .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
             .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
+        .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
 
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
similarity index 67%
copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index 1d1a651..6ec2fde 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -18,31 +18,22 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
- * A pass for applying compression algorithm for data flowing between vertices.
+ * A pass for applying decompression algorithm for data flowing between vertices.
+ * It always
  */
-public final class CompressionPass extends AnnotatingPass {
-  private final CompressionProperty.Value compression;
-
-  /**
-   * Default constructor. Uses LZ4 as default.
-   */
-  public CompressionPass() {
-    super(CompressionProperty.class);
-    this.compression = CompressionProperty.Value.LZ4;
-  }
+public final class DecompressionPass extends AnnotatingPass {
 
   /**
    * Constructor.
-   * @param compression Compression to apply on edges.
    */
-  public CompressionPass(final CompressionProperty.Value compression) {
+  public DecompressionPass() {
     super(CompressionProperty.class);
-    this.compression = compression;
   }
 
   @Override
@@ -50,7 +41,11 @@ public final class CompressionPass extends AnnotatingPass {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
         .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
             .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
-        .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
+        // Find edges which have a compression property but not decompression property.
+        .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
+            && !edge.getPropertyValue(DecompressionProperty.class).isPresent())
+        .forEach(edge -> edge.setProperty(DecompressionProperty.of(
+            edge.getPropertyValue(CompressionProperty.class).get()))));
 
     return dag;
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
similarity index 64%
copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
index eade93d..2283bfb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
@@ -15,32 +15,33 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 /**
- * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ * Pass for initiating IREdge Decoder ExecutionProperty with default dummy coder.
  */
-public final class DefaultEdgeCoderPass extends AnnotatingPass {
+public final class DefaultEdgeDecoderPass extends AnnotatingPass {
 
-  private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+  private static final DecoderProperty DEFAULT_DECODER_PROPERTY =
+      DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY);
 
   /**
    * Default constructor.
    */
-  public DefaultEdgeCoderPass() {
-    super(CoderProperty.class);
+  public DefaultEdgeDecoderPass() {
+    super(DecoderProperty.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) {
-            irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+          if (!irEdge.getPropertyValue(DecoderProperty.class).isPresent()) {
+            irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
           }
         }));
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
similarity index 64%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
index eade93d..0238c72 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
@@ -15,32 +15,33 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 /**
- * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ * Pass for initiating IREdge Encoder ExecutionProperty with default dummy coder.
  */
-public final class DefaultEdgeCoderPass extends AnnotatingPass {
+public final class DefaultEdgeEncoderPass extends AnnotatingPass {
 
-  private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+  private static final EncoderProperty DEFAULT_DECODER_PROPERTY =
+      EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY);
 
   /**
    * Default constructor.
    */
-  public DefaultEdgeCoderPass() {
-    super(CoderProperty.class);
+  public DefaultEdgeEncoderPass() {
+    super(EncoderProperty.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) {
-            irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+          if (!irEdge.getPropertyValue(EncoderProperty.class).isPresent()) {
+            irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
           }
         }));
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index adf862a..b0ab2b1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -31,12 +31,14 @@ public final class PrimitiveCompositePass extends CompositePass {
   public PrimitiveCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
-        new DefaultEdgeCoderPass(),
+        new DefaultEdgeEncoderPass(),
+        new DefaultEdgeDecoderPass(),
         new DefaultStagePartitioningPass(),
         new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
         new DefaultEdgeUsedDataHandlingPass(),
         new ScheduleGroupPass(),
-        new CompressionPass()
+        new CompressionPass(),
+        new DecompressionPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 64b1060..eef0b20 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -15,9 +15,11 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -150,9 +152,13 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
               outListToModify.remove(e);
               final IREdge newIrEdge = new IREdge(e.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                   operatorVertexToUse, e.getDst());
-              final Optional<Coder> coderProperty = e.getPropertyValue(CoderProperty.class);
-              if (coderProperty.isPresent()) {
-                newIrEdge.setProperty(CoderProperty.of(coderProperty.get()));
+              final Optional<EncoderFactory> encoderProperty = e.getPropertyValue(EncoderProperty.class);
+              if (encoderProperty.isPresent()) {
+                newIrEdge.setProperty(EncoderProperty.of(encoderProperty.get()));
+              }
+              final Optional<DecoderFactory> decoderProperty = e.getPropertyValue(DecoderProperty.class);
+              if (decoderProperty.isPresent()) {
+                newIrEdge.setProperty(DecoderProperty.of(decoderProperty.get()));
               }
               outListToModify.add(newIrEdge);
             });
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 4139264..1117a7c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -18,7 +18,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -66,7 +67,8 @@ public final class DataSkewReshapingPass extends ReshapingPass {
             // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
             final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 edge.getSrc(), metricCollectionBarrierVertex);
-            newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+            newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+            newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
 
             final IREdge edgeToGbK = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                 metricCollectionBarrierVertex, v, edge.isSideInput());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 10d76e1..072c163 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -17,7 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.dag.DAG;
@@ -288,7 +289,8 @@ public final class LoopOptimizations {
                 edgesToRemove.add(edge);
                 final IREdge newEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                     candidate.getKey(), edge.getDst(), edge.isSideInput());
-                newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+                newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+                newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
                 edgesToAdd.add(newEdge);
               });
           final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index e899da9..9bf434a 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -17,7 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -63,7 +64,8 @@ public final class SailfishRelayReshapingPass extends ReshapingPass {
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
             final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 iFileMergerVertex, v);
-            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+            newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+            newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newEdgeToMerger);
             builder.connectVertices(newEdgeFromMerger);
           } else {
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
index 00d2cb3..c6bec6a 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
@@ -193,7 +193,7 @@ public final class JavaUserDefinedTypedAggregation {
     }
 
     /**
-     * Specifies the Encoder for the intermediate value type.
+     * Specifies the EncoderFactory for the intermediate value type.
      *
      * @return buffer encoder.
      */
@@ -202,7 +202,7 @@ public final class JavaUserDefinedTypedAggregation {
     }
 
     /**
-     * Specifies the Encoder for the final output value type.
+     * Specifies the EncoderFactory for the final output value type.
      *
      * @return output encoder.
      */
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index c9d3cc2..69f5a17 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,7 +17,9 @@ package edu.snu.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
@@ -108,15 +110,21 @@ public final class Executor {
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
       task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
-              .orElse(null)));
+          e.getPropertyValue(EncoderProperty.class).get(),
+          e.getPropertyValue(DecoderProperty.class).get(),
+          e.getPropertyValue(CompressionProperty.class).orElse(null),
+          e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class).
-              orElse(null)));
+          e.getPropertyValue(EncoderProperty.class).get(),
+          e.getPropertyValue(DecoderProperty.class).get(),
+          e.getPropertyValue(CompressionProperty.class).orElse(null),
+          e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
-            e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
-                .orElse(null)));
+            e.getPropertyValue(EncoderProperty.class).get(),
+            e.getPropertyValue(DecoderProperty.class).get(),
+            e.getPropertyValue(CompressionProperty.class).orElse(null),
+            e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
 
       new TaskExecutor(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index ee0aa46..69eb9ba 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -17,10 +17,12 @@ package edu.snu.nemo.runtime.executor.data;
 
 import com.google.common.io.CountingInputStream;
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
-import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
+import edu.snu.nemo.runtime.executor.data.streamchainer.DecodeStreamChainer;
+import edu.snu.nemo.runtime.executor.data.streamchainer.EncodeStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,18 +49,19 @@ public final class DataUtil {
   /**
    * Serializes the elements in a non-serialized partition into an output stream.
    *
-   * @param coder                  the coder to encode the elements.
+   * @param encoderFactory                the encoderFactory to encode the elements.
    * @param nonSerializedPartition the non-serialized partition to serialize.
    * @param bytesOutputStream      the output stream to write.
    * @return total number of elements in the partition.
    * @throws IOException if fail to serialize.
    */
-  public static long serializePartition(final Coder coder,
+  public static long serializePartition(final EncoderFactory encoderFactory,
                                         final NonSerializedPartition nonSerializedPartition,
                                         final OutputStream bytesOutputStream) throws IOException {
     long elementsCount = 0;
+    final EncoderFactory.Encoder encoder = encoderFactory.create(bytesOutputStream);
     for (final Object element : nonSerializedPartition.getData()) {
-      coder.encode(element, bytesOutputStream);
+      encoder.encode(element);
       elementsCount++;
     }
 
@@ -77,9 +80,10 @@ public final class DataUtil {
    * @throws IOException if fail to deserialize.
    */
   public static <K extends Serializable> NonSerializedPartition deserializePartition(final long elementsInPartition,
-                                                            final Serializer serializer,
-                                                            final K key,
-                                                            final InputStream inputStream) throws IOException {
+                                                                                     final Serializer serializer,
+                                                                                     final K key,
+                                                                                     final InputStream inputStream)
+      throws IOException {
     final List deserializedData = new ArrayList();
     final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
         serializer, elementsInPartition);
@@ -105,9 +109,10 @@ public final class DataUtil {
     for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
       try (
           final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
-          final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers());
+          final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
       ) {
-        final long elementsTotal = serializePartition(serializer.getCoder(), partitionToConvert, wrappedStream);
+        final long elementsTotal =
+            serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
         // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
         // inner buffer directly, which can be an unfinished(not flushed) buffer.
         wrappedStream.close();
@@ -190,13 +195,14 @@ public final class DataUtil {
   }
 
   /**
-   * An iterator that emits objects from {@link InputStream} using the corresponding {@link Coder}.
+   * An iterator that emits objects from {@link InputStream} using the corresponding {@link DecoderFactory}.
+   *
    * @param <T> The type of elements.
    */
   public static final class InputStreamIterator<T> implements IteratorWithNumBytes<T> {
 
     private final Iterator<InputStream> inputStreams;
-    private final Serializer<T> serializer;
+    private final Serializer<?, T> serializer;
     private final long limit;
 
     private volatile CountingInputStream serializedCountingStream = null;
@@ -204,17 +210,19 @@ public final class DataUtil {
     private volatile boolean hasNext = false;
     private volatile T next;
     private volatile boolean cannotContinueDecoding = false;
+    private volatile DecoderFactory.Decoder<T> decoder = null;
     private volatile long elementsDecoded = 0;
     private volatile long numSerializedBytes = 0;
     private volatile long numEncodedBytes = 0;
 
     /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link Coder}.
+     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
      */
-    public InputStreamIterator(final Iterator<InputStream> inputStreams, final Serializer<T> serializer) {
+    public InputStreamIterator(final Iterator<InputStream> inputStreams,
+                               final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
       // -1 means no limit.
@@ -222,7 +230,7 @@ public final class DataUtil {
     }
 
     /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link Coder}.
+     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
@@ -230,7 +238,7 @@ public final class DataUtil {
      */
     public InputStreamIterator(
         final Iterator<InputStream> inputStreams,
-        final Serializer<T> serializer,
+        final Serializer<?, T> serializer,
         final long limit) {
       if (limit < 0) {
         throw new IllegalArgumentException("Negative limit not allowed.");
@@ -254,11 +262,12 @@ public final class DataUtil {
       }
       while (true) {
         try {
-          if (encodedCountingStream == null) {
+          if (decoder == null) {
             if (inputStreams.hasNext()) {
               serializedCountingStream = new CountingInputStream(inputStreams.next());
               encodedCountingStream = new CountingInputStream(buildInputStream(
-                  serializedCountingStream, serializer.getStreamChainers()));
+                  serializedCountingStream, serializer.getDecodeStreamChainers()));
+              decoder = serializer.getDecoderFactory().create(encodedCountingStream);
             } else {
               cannotContinueDecoding = true;
               return false;
@@ -269,7 +278,7 @@ public final class DataUtil {
           throw new RuntimeException(e);
         }
         try {
-          next = serializer.getCoder().decode(encodedCountingStream);
+          next = decoder.decode();
           hasNext = true;
           elementsDecoded++;
           return true;
@@ -279,6 +288,7 @@ public final class DataUtil {
           numEncodedBytes += encodedCountingStream.getCount();
           serializedCountingStream = null;
           encodedCountingStream = null;
+          decoder = null;
         }
       }
     }
@@ -313,50 +323,54 @@ public final class DataUtil {
   }
 
   /**
-   * Chain {@link InputStream} with {@link StreamChainer}s.
+   * Chain {@link InputStream} with {@link DecodeStreamChainer}s.
    *
-   * @param in             the {@link InputStream}.
-   * @param streamChainers the list of {@link StreamChainer} to be applied on the stream.
-   * @return chained       {@link InputStream}.
-   * @throws IOException   if fail to create new stream.
+   * @param in                   the {@link InputStream}.
+   * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to be applied on the stream.
+   * @return chained {@link InputStream}.
+   * @throws IOException if fail to create new stream.
    */
-  public static InputStream buildInputStream(final InputStream in, final List<StreamChainer> streamChainers)
-  throws IOException {
+  public static InputStream buildInputStream(final InputStream in,
+                                             final List<DecodeStreamChainer> decodeStreamChainers)
+      throws IOException {
     InputStream chained = in;
-    for (final StreamChainer streamChainer : streamChainers) {
-      chained = streamChainer.chainInput(chained);
+    for (final DecodeStreamChainer encodeStreamChainer : decodeStreamChainers) {
+      chained = encodeStreamChainer.chainInput(chained);
     }
     return chained;
   }
 
   /**
-   * Chain {@link OutputStream} with {@link StreamChainer}s.
+   * Chain {@link OutputStream} with {@link EncodeStreamChainer}s.
    *
-   * @param out            the {@link OutputStream}.
-   * @param streamChainers the list of {@link StreamChainer} to be applied on the stream.
-   * @return chained       {@link OutputStream}.
-   * @throws IOException   if fail to create new stream.
+   * @param out                  the {@link OutputStream}.
+   * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to be applied on the stream.
+   * @return chained {@link OutputStream}.
+   * @throws IOException if fail to create new stream.
    */
-  public static OutputStream buildOutputStream(final OutputStream out, final List<StreamChainer> streamChainers)
-  throws IOException {
+  public static OutputStream buildOutputStream(final OutputStream out,
+                                               final List<EncodeStreamChainer> encodeStreamChainers)
+      throws IOException {
     OutputStream chained = out;
-    final List<StreamChainer> temporaryStreamChainerList = new ArrayList<>(streamChainers);
-    Collections.reverse(temporaryStreamChainerList);
-    for (final StreamChainer streamChainer : temporaryStreamChainerList) {
-      chained = streamChainer.chainOutput(chained);
+    final List<EncodeStreamChainer> temporaryEncodeStreamChainerList = new ArrayList<>(encodeStreamChainers);
+    Collections.reverse(temporaryEncodeStreamChainerList);
+    for (final EncodeStreamChainer encodeStreamChainer : temporaryEncodeStreamChainerList) {
+      chained = encodeStreamChainer.chainOutput(chained);
     }
     return chained;
   }
 
   /**
    * {@link Iterator} with interface to access to the number of bytes.
+   *
    * @param <T> the type of decoded object
    */
   public interface IteratorWithNumBytes<T> extends Iterator<T> {
     /**
      * Create an {@link IteratorWithNumBytes}, with no information about the number of bytes.
+     *
      * @param innerIterator {@link Iterator} to wrap
-     * @param <E> the type of decoded object
+     * @param <E>           the type of decoded object
      * @return an {@link IteratorWithNumBytes}, with no information about the number of bytes
      */
     static <E> IteratorWithNumBytes<E> of(final Iterator<E> innerIterator) {
@@ -385,10 +399,11 @@ public final class DataUtil {
 
     /**
      * Create an {@link IteratorWithNumBytes}, with the number of bytes in decoded and serialized form.
-     * @param innerIterator {@link Iterator} to wrap
+     *
+     * @param innerIterator      {@link Iterator} to wrap
      * @param numSerializedBytes the number of bytes in serialized form
-     * @param numEncodedBytes the number of bytes in encoded form
-     * @param <E> the type of decoded object
+     * @param numEncodedBytes    the number of bytes in encoded form
+     * @param <E>                the type of decoded object
      * @return an {@link IteratorWithNumBytes}, with the information about the number of bytes
      */
     static <E> IteratorWithNumBytes<E> of(final Iterator<E> innerIterator,
@@ -432,6 +447,7 @@ public final class DataUtil {
     /**
      * This method should be called after the actual data is taken out of iterator,
      * since the existence of an iterator does not guarantee that data inside it is ready.
+     *
      * @return the number of bytes in serialized form (which is, for example, encoded and compressed)
      * @throws NumBytesNotSupportedException when the operation is not supported
      * @throws IllegalStateException         when the information is not ready
@@ -441,6 +457,7 @@ public final class DataUtil {
     /**
      * This method should be called after the actual data is taken out of iterator,
      * since the existence of an iterator does not guarantee that data inside it is ready.
+     *
      * @return the number of bytes in encoded form (which is ready to be decoded)
      * @throws NumBytesNotSupportedException when the operation is not supported
      * @throws IllegalStateException         when the information is not ready
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
index 46c4609..5ea1bec 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
@@ -15,23 +15,22 @@
  */
 package edu.snu.nemo.runtime.executor.data;
 
-import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
-import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.runtime.executor.data.streamchainer.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * Mapping from RuntimeEdgeId to Coder.
+ * Mapping from RuntimeEdgeId to {@link Serializer}.
  */
 public final class SerializerManager {
   private static final Logger LOG = LoggerFactory.getLogger(SerializerManager.class.getName());
@@ -45,40 +44,53 @@ public final class SerializerManager {
   }
 
   /**
-   * Register a coder for runtime edge.
+   * Register a encoderFactory for runtime edge.
+   * This method regards that compression & decompression property are empty.
    *
-   * @param runtimeEdgeId id of the runtime edge.
-   * @param coder         the corresponding coder.
+   * @param runtimeEdgeId  id of the runtime edge.
+   * @param encoderFactory the corresponding encoder factory.
+   * @param decoderFactory the corresponding decoder factory.
    */
   public void register(final String runtimeEdgeId,
-                       final Coder coder) {
-    register(runtimeEdgeId, coder, null);
+                       final EncoderFactory encoderFactory,
+                       final DecoderFactory decoderFactory) {
+    register(runtimeEdgeId, encoderFactory, decoderFactory, null, null);
   }
 
   /**
-   * Register a coder for runtime edge.
+   * Register a encoderFactory for runtime edge.
    *
-   * @param runtimeEdgeId id of the runtime edge.
-   * @param coder         the corresponding coder.
+   * @param runtimeEdgeId         id of the runtime edge.
+   * @param encoderFactory        the corresponding encoder factory.
+   * @param decoderFactory        the corresponding decoder factory.
    * @param compressionProperty   compression property, or null not to enable compression
+   * @param decompressionProperty decompression property, or null not to enable decompression
    */
   public void register(final String runtimeEdgeId,
-                       final Coder coder,
-                       final CompressionProperty.Value compressionProperty) {
+                       final EncoderFactory encoderFactory,
+                       final DecoderFactory decoderFactory,
+                       @Nullable final CompressionProperty.Value compressionProperty,
+                       @Nullable final CompressionProperty.Value decompressionProperty) {
     LOG.debug("{} edge id registering to SerializerManager", runtimeEdgeId);
-    final Serializer serializer = new Serializer(coder, Collections.emptyList());
-    runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer);
 
-    final List<StreamChainer> streamChainerList = new ArrayList<>();
+    final List<EncodeStreamChainer> encodeStreamChainers = new ArrayList<>();
+    final List<DecodeStreamChainer> decodeStreamChainers = new ArrayList<>();
 
     // Compression chain
     if (compressionProperty != null) {
       LOG.debug("Adding {} compression chain for {}",
           compressionProperty, runtimeEdgeId);
-      streamChainerList.add(new CompressionStreamChainer(compressionProperty));
+      encodeStreamChainers.add(new CompressionStreamChainer(compressionProperty));
+    }
+    if (decompressionProperty != null) {
+      LOG.debug("Adding {} decompression chain for {}",
+          decompressionProperty, runtimeEdgeId);
+      decodeStreamChainers.add(new DecompressionStreamChainer(decompressionProperty));
     }
 
-    serializer.setStreamChainers(streamChainerList);
+    final Serializer serializer =
+        new Serializer(encoderFactory, decoderFactory, encodeStreamChainers, decodeStreamChainers);
+    runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index 200e465..d6d63d1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.executor.data.partition;
 
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
 import javax.annotation.Nullable;
@@ -39,7 +39,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
   // Will be null when the partition is committed when it is constructed.
   @Nullable private final DirectByteArrayOutputStream bytesOutputStream;
   @Nullable private final OutputStream wrappedStream;
-  @Nullable private final Coder coder;
+  @Nullable private final EncoderFactory.Encoder encoder;
 
   /**
    * Creates a serialized {@link Partition} without actual data.
@@ -57,8 +57,8 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
     this.length = 0;
     this.committed = false;
     this.bytesOutputStream = new DirectByteArrayOutputStream();
-    this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers());
-    this.coder = serializer.getCoder();
+    this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+    this.encoder = serializer.getEncoderFactory().create(wrappedStream);
   }
 
   /**
@@ -81,7 +81,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
     this.committed = true;
     this.bytesOutputStream = null;
     this.wrappedStream = null;
-    this.coder = null;
+    this.encoder = null;
   }
 
   /**
@@ -96,7 +96,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
       throw new IOException("The partition is already committed!");
     } else {
       try {
-        coder.encode(element, wrappedStream);
+        encoder.encode(element);
         elementsCount++;
       } catch (final IOException e) {
         wrappedStream.close();
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index c5c2e92..467ba18 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -17,19 +17,16 @@ package edu.snu.nemo.runtime.executor.data.streamchainer;
 
 import edu.snu.nemo.common.exception.UnsupportedCompressionException;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 /**
- * {@link StreamChainer} for applying compression.
+ * {@link EncodeStreamChainer} for applying compression.
  */
-public class CompressionStreamChainer implements StreamChainer {
+public class CompressionStreamChainer implements EncodeStreamChainer {
   private final CompressionProperty.Value compression;
 
   /**
@@ -42,18 +39,6 @@ public class CompressionStreamChainer implements StreamChainer {
   }
 
   @Override
-  public final InputStream chainInput(final InputStream in) throws IOException {
-    switch (compression) {
-      case Gzip:
-        return new GZIPInputStream(in);
-      case LZ4:
-        return new LZ4BlockInputStream(in);
-      default:
-        throw new UnsupportedCompressionException("Not supported compression method");
-    }
-  }
-
-  @Override
   public final OutputStream chainOutput(final OutputStream out) throws IOException {
     switch (compression) {
       case Gzip:
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java
similarity index 67%
copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
copy to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java
index 2dbeec7..6084b6e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java
@@ -17,13 +17,13 @@ package edu.snu.nemo.runtime.executor.data.streamchainer;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 
 /**
- * A {@link StreamChainer} object indicates each stream manipulation strategy.
- * Stream can be chained by {@link StreamChainer} multiple times.
+ * A {@link DecodeStreamChainer} object indicates each stream manipulation strategy.
+ * Stream can be chained by {@link DecodeStreamChainer} multiple times.
  */
-public interface StreamChainer {
+public interface DecodeStreamChainer {
+
   /**
    * Chain {@link InputStream} and returns chained {@link InputStream}.
    *
@@ -32,13 +32,4 @@ public interface StreamChainer {
    * @throws IOException if fail to chain the stream.
    */
   InputStream chainInput(InputStream in) throws IOException;
-
-  /**
-   * Chain {@link OutputStream} and returns chained {@link OutputStream}.
-   *
-   * @param out the stream which will be chained.
-   * @return chained {@link OutputStream}.
-   * @throws IOException if fail to chain the stream.
-   */
-  OutputStream chainOutput(OutputStream out) throws IOException;
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
similarity index 69%
copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
copy to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
index c5c2e92..558bd35 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
@@ -18,18 +18,15 @@ package edu.snu.nemo.runtime.executor.data.streamchainer;
 import edu.snu.nemo.common.exception.UnsupportedCompressionException;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import net.jpountz.lz4.LZ4BlockInputStream;
-import net.jpountz.lz4.LZ4BlockOutputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 /**
- * {@link StreamChainer} for applying compression.
+ * {@link DecodeStreamChainer} for applying compression.
  */
-public class CompressionStreamChainer implements StreamChainer {
+public class DecompressionStreamChainer implements DecodeStreamChainer {
   private final CompressionProperty.Value compression;
 
   /**
@@ -37,7 +34,7 @@ public class CompressionStreamChainer implements StreamChainer {
    *
    * @param compression compression method.
    */
-  public CompressionStreamChainer(final CompressionProperty.Value compression) {
+  public DecompressionStreamChainer(final CompressionProperty.Value compression) {
     this.compression = compression;
   }
 
@@ -52,16 +49,4 @@ public class CompressionStreamChainer implements StreamChainer {
         throw new UnsupportedCompressionException("Not supported compression method");
     }
   }
-
-  @Override
-  public final OutputStream chainOutput(final OutputStream out) throws IOException {
-    switch (compression) {
-      case Gzip:
-        return new GZIPOutputStream(out);
-      case LZ4:
-        return new LZ4BlockOutputStream(out);
-      default:
-        throw new UnsupportedCompressionException("Not supported compression method");
-    }
-  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
similarity index 67%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
index 2dbeec7..b0c7547 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
@@ -16,22 +16,13 @@
 package edu.snu.nemo.runtime.executor.data.streamchainer;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
 /**
- * A {@link StreamChainer} object indicates each stream manipulation strategy.
- * Stream can be chained by {@link StreamChainer} multiple times.
+ * A {@link EncodeStreamChainer} object indicates each stream manipulation strategy.
+ * Stream can be chained by {@link EncodeStreamChainer} multiple times.
  */
-public interface StreamChainer {
-  /**
-   * Chain {@link InputStream} and returns chained {@link InputStream}.
-   *
-   * @param in the stream which will be chained.
-   * @return chained {@link InputStream}.
-   * @throws IOException if fail to chain the stream.
-   */
-  InputStream chainInput(InputStream in) throws IOException;
+public interface EncodeStreamChainer {
 
   /**
    * Chain {@link OutputStream} and returns chained {@link OutputStream}.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
index d79d28f..e30147a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
@@ -15,53 +15,65 @@
  */
 package edu.snu.nemo.runtime.executor.data.streamchainer;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 
 import java.util.List;
 
 /**
- * class that contains {@link Coder} and {@link List} of {@link StreamChainer}.
- * @param <T> coder element type.
+ * class that contains {@link EncoderFactory}, {@link DecoderFactory} and {@link List} of {@link EncodeStreamChainer}.
+ * @param <E> encoderFactory element type.
+ * @param <D> decoderFactory element type.
  */
-public final class Serializer<T> {
-  private Coder<T> coder;
-  private List<StreamChainer> streamChainers;
+public final class Serializer<E, D> {
+  private final EncoderFactory<E> encoderFactory;
+  private final DecoderFactory<D> decoderFactory;
+  private final List<EncodeStreamChainer> encodeStreamChainers;
+  private final List<DecodeStreamChainer> decodeStreamChainers;
 
   /**
    * Constructor.
    *
-   * @param coder      {@link Coder}.
-   * @param streamChainers list of {@link StreamChainer}.
+   * @param encoderFactory              {@link EncoderFactory}.
+   * @param decoderFactory              {@link DecoderFactory}.
+   * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to use for encoding.
+   * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to use for decoding.
    */
-  public Serializer(final Coder<T> coder, final List<StreamChainer> streamChainers) {
-    this.coder = coder;
-    this.streamChainers = streamChainers;
+  public Serializer(final EncoderFactory<E> encoderFactory,
+                    final DecoderFactory<D> decoderFactory,
+                    final List<EncodeStreamChainer> encodeStreamChainers,
+                    final List<DecodeStreamChainer> decodeStreamChainers) {
+    this.encoderFactory = encoderFactory;
+    this.decoderFactory = decoderFactory;
+    this.encodeStreamChainers = encodeStreamChainers;
+    this.decodeStreamChainers = decodeStreamChainers;
   }
 
   /**
-   * method that returns {@link Coder}.
-   *
-   * @return {@link Coder}.
+   * @return the {@link EncoderFactory} to use.
    */
-  public Coder<T> getCoder() {
-    return coder;
+  public EncoderFactory<E> getEncoderFactory() {
+    return encoderFactory;
   }
 
   /**
-   * method that returns list of {@link StreamChainer}.
-   *
-   * @return list of {@link StreamChainer}.
+   * @return the {@link DecoderFactory} to use.
    */
-  public List<StreamChainer> getStreamChainers() {
-    return streamChainers;
+  public DecoderFactory<D> getDecoderFactory() {
+    return decoderFactory;
   }
 
   /**
-   * method that sets list of {@link StreamChainer}.
-   *
-   * @param streamChainers list of {@link StreamChainer}.
+   * @return the list of {@link EncodeStreamChainer} for encoding.
+   */
+  public List<EncodeStreamChainer> getEncodeStreamChainers() {
+    return encodeStreamChainers;
+  }
+
+  /**
+   * @return the list of {@link EncodeStreamChainer} for decoding.
    */
-  public void setStreamChainers(final List<StreamChainer> streamChainers) {
-    this.streamChainers = streamChainers;
+  public List<DecodeStreamChainer> getDecodeStreamChainers() {
+    return decodeStreamChainers;
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 74c2504..b9f0c63 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -16,11 +16,9 @@
 package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.coder.IntCoder;
-import edu.snu.nemo.common.coder.PairCoder;
+import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -30,6 +28,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.streamchainer.DecompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -71,9 +70,11 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class})
 public final class BlockStoreTest {
   private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
-  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
-  private static final Serializer SERIALIZER = new Serializer(CODER,
-      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)));
+  private static final Serializer SERIALIZER = new Serializer(
+      PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()),
+      PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of()),
+      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)),
+      Collections.singletonList(new DecompressionStreamChainer(CompressionProperty.Value.LZ4)));
   private static final SerializerManager serializerManager = mock(SerializerManager.class);
   private BlockManagerMaster blockManagerMaster;
   private LocalMessageDispatcher messageDispatcher;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 012701c..0b119d8 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,8 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
-import edu.snu.nemo.common.coder.IntCoder;
-import edu.snu.nemo.common.coder.PairCoder;
+import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
@@ -26,7 +25,6 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -43,9 +41,6 @@ import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
-import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
-import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
-import edu.snu.nemo.runtime.executor.datatransfer.OutputWriter;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -105,7 +100,9 @@ public final class DataTransferTest {
   private static final int PARALLELISM_TEN = 10;
   private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
   private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
-  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
+  private static final EncoderFactory ENCODER_FACTORY = PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+  private static final DecoderFactory DECODER_FACTORY =
+      PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
   private static final Tang TANG = Tang.Factory.getTang();
   private static final int HASH_RANGE_MULTIPLIER = 10;
 
@@ -307,14 +304,14 @@ public final class DataTransferTest {
 
     // Edge setup
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
-    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
+    dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
+    dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
+    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
+    dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
-    edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
-    edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    edgeProperties.put(DataStoreProperty.of(store));
-    edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
-    edgeProperties.put(CoderProperty.of(CODER));
     final RuntimeEdge dummyEdge;
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
@@ -398,20 +395,20 @@ public final class DataTransferTest {
 
     // Edge setup
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
-    dummyIREdge.setProperty(CoderProperty.of(CODER));
+    dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
+    dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
-    final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
-    edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
-    edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    edgeProperties.put(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
+    dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
+    dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
+    dummyIREdge.setProperty(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty
-        = edgeProperties.get(DuplicateEdgeGroupProperty.class);
+        = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
-
-    edgeProperties.put(DataStoreProperty.of(store));
-    edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
+    final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
@@ -523,8 +520,8 @@ public final class DataTransferTest {
   private Pair<IRVertex, IRVertex> setupVertices(final String edgeId,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER);
-    serializerManagers.get(receiver).register(edgeId, CODER);
+    serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
@@ -543,10 +540,10 @@ public final class DataTransferTest {
                                                  final String edgeId2,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER);
-    serializerManagers.get(receiver).register(edgeId, CODER);
-    serializerManagers.get(sender).register(edgeId2, CODER);
-    serializerManagers.get(receiver).register(edgeId2, CODER);
+    serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(sender).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 55d1d3f..998f3aa 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -17,7 +17,6 @@ package edu.snu.nemo.runtime.executor.task;
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.OutputCollector;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
index b403f7b..002873c 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -16,13 +16,16 @@
 package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeDecoderPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeEncoderPass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +36,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}.
+ * Test {@link DefaultEdgeEncoderPass} and {@link DefaultEdgeDecoderPass}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
@@ -47,33 +50,43 @@ public class DefaultEdgeCoderPassTest {
 
   @Test
   public void testAnnotatingPass() {
-    final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
-    assertEquals(CoderProperty.class, coderPass.getExecutionPropertyToModify());
+    final AnnotatingPass encoderPass = new DefaultEdgeEncoderPass();
+    assertEquals(EncoderProperty.class, encoderPass.getExecutionPropertyToModify());
+    final AnnotatingPass decoderPass = new DefaultEdgeDecoderPass();
+    assertEquals(DecoderProperty.class, decoderPass.getExecutionPropertyToModify());
   }
 
   @Test
   public void testNotOverride() {
     // Get the first coder from the compiled DAG
-    final Coder compiledCoder = compiledDAG
-        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getPropertyValue(CoderProperty.class).get();
-    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+    final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory compiledEncoderFactory = irEdge.getPropertyValue(EncoderProperty.class).get();
+    final DecoderFactory compiledDecoderFactory = irEdge.getPropertyValue(DecoderProperty.class).get();
+    DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG);
+    processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG);
 
     // Get the first coder from the processed DAG
-    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
-        .get(0).getPropertyValue(CoderProperty.class).get();
-    assertEquals(compiledCoder, processedCoder); // It must not be changed.
+    final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get();
+    assertEquals(compiledEncoderFactory, processedEncoderFactory); // It must not be changed.
+    final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get();
+    assertEquals(compiledDecoderFactory, processedDecoderFactory); // It must not be changed.
   }
 
   @Test
   public void testSetToDefault() throws Exception {
     // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
-    compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
-        .get(0).getExecutionProperties().remove(CoderProperty.class);
-    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+    final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0);
+    irEdge.getExecutionProperties().remove(EncoderProperty.class);
+    irEdge.getExecutionProperties().remove(DecoderProperty.class);
+    DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG);
+    processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG);
 
-    // Check whether the pass set the empty coder to our default coder.
-    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
-        .get(0).getPropertyValue(CoderProperty.class).get();
-    assertEquals(Coder.DUMMY_CODER, processedCoder);
+    // Check whether the pass set the empty coder to our default encoder & decoder.
+    final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get();
+    final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get();
+    assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, processedEncoderFactory);
+    assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, processedDecoderFactory);
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index 8a6b1a8..d2d6954 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -20,7 +20,8 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -114,13 +115,15 @@ public class LoopFusionPassTest {
     loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           vertexToBeFollowed, loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
+      newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get()));
+      newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
     loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           irEdge.getSrc(), loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
+      newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get()));
+      newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index de6ff56..a461873 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -20,7 +20,8 @@ import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -92,7 +93,8 @@ public class LoopInvariantCodeMotionPassTest {
             assertTrue(incomingEdge.isPresent());
             final IREdge newIREdge = new IREdge(incomingEdge.get().getPropertyValue(
                 DataCommunicationPatternProperty.class).get(), incomingEdge.get().getSrc(), alsLoop);
-            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getPropertyValue(CoderProperty.class).get()));
+            newIREdge.setProperty(EncoderProperty.of(incomingEdge.get().getPropertyValue(EncoderProperty.class).get()));
+            newIREdge.setProperty(DecoderProperty.of(incomingEdge.get().getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newIREdge);
           }
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 793c182..8d13374 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -29,21 +29,21 @@ public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(13, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(15, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(15, padoPolicy.getCompileTimePasses().size());
+    assertEquals(17, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(17, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(19, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 


Mime
View raw message