nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-101] Make Coder as an execution property (#32)
Date Fri, 15 Jun 2018 00:19:33 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac7478  [NEMO-101] Make Coder as an execution property (#32)
7ac7478 is described below

commit 7ac7478098293df93a304dfb5d556b3863eecd5f
Author: Sanha Lee <sanhaleehana@gmail.com>
AuthorDate: Fri Jun 15 09:19:27 2018 +0900

    [NEMO-101] Make Coder as an execution property (#32)
    
    JIRA: [NEMO-101: Make Coder as an execution property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-101)
    
    **Major changes:**
    - Make `Coder` as an execution property and let optimization pass to configure it.
    
    **Minor changes to note:**
    - Implement `DefaultEdgeCoderPass` which assign `Coder.DUMMY_CODER` as a default and include it in `PrimitiveCompositePass`.
    
    **Tests for the changes:**
    - `DefaultEdgeCoderPassTest` is added for `DefaultEdgeCoderPass`.
    - Existing other unit tests and integration tests also cover the change.
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-101: Make Coder as an execution property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-101)
---
 bin/json2dot.py                                    | 44 +++---------
 .../main/java/edu/snu/nemo/common/coder/Coder.java |  5 +-
 .../java/edu/snu/nemo/common/ir/edge/IREdge.java   | 39 ++++-------
 ...eyExtractorProperty.java => CoderProperty.java} | 16 +++--
 .../executionproperty/KeyExtractorProperty.java    |  2 +
 .../ir/executionproperty/ExecutionProperty.java    | 20 ++++++
 .../edu/snu/nemo/common/ir/vertex/LoopVertex.java  |  8 +--
 .../frontend/beam/NemoPipelineVisitor.java         |  9 +--
 .../frontend/spark/core/SparkFrontendUtils.java    |  8 ++-
 .../frontend/spark/core/rdd/PairRDDFunctions.scala |  7 +-
 .../compiler/frontend/spark/core/rdd/RDD.scala     | 28 +++++---
 .../compiler/optimizer/CompiletimeOptimizer.java   |  2 +-
 .../MapReduceDisaggregationOptimization.java       |  7 +-
 .../annotating/DefaultEdgeCoderPass.java           | 51 ++++++++++++++
 .../composite/PrimitiveCompositePass.java          |  1 +
 .../CommonSubexpressionEliminationPass.java        |  4 +-
 .../reshaping/DataSkewReshapingPass.java           |  6 +-
 .../compiletime/reshaping/LoopExtractionPass.java  | 16 ++---
 .../compiletime/reshaping/LoopOptimizations.java   | 11 +--
 .../reshaping/SailfishRelayReshapingPass.java      |  8 ++-
 .../runtime/common/plan/PhysicalPlanGenerator.java |  4 +-
 .../snu/nemo/runtime/common/plan/RuntimeEdge.java  | 59 +++++++---------
 .../snu/nemo/runtime/common/plan/StageEdge.java    | 34 +++++-----
 .../nemo/runtime/common/plan/StageEdgeBuilder.java | 27 ++++----
 .../edu/snu/nemo/runtime/executor/Executor.java    | 13 ++--
 .../runtime/executor/task/TaskExecutorTest.java    |  3 +-
 .../runtime/plangenerator/TestPlanGenerator.java   | 13 ++--
 .../snu/nemo/tests/common/ir/LoopVertexTest.java   | 27 +++-----
 .../ExecutionPropertyMapTest.java                  |  6 +-
 .../compiler/backend/nemo/NemoBackendTest.java     | 11 ++-
 .../annotating/DefaultEdgeCoderPassTest.java       | 79 ++++++++++++++++++++++
 .../annotating/DefaultParallelismPassTest.java     |  2 +-
 .../CommonSubexpressionEliminationPassTest.java    | 17 +++--
 .../compiletime/reshaping/LoopFusionPassTest.java  | 10 ++-
 .../reshaping/LoopInvariantCodeMotionPassTest.java | 11 +--
 .../optimizer/policy/PolicyBuilderTest.java        |  6 +-
 .../runtime/common/plan/DAGConverterTest.java      | 14 ++--
 .../executor/datatransfer/DataTransferTest.java    | 40 ++++++++---
 38 files changed, 404 insertions(+), 264 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 85538e7..8a841f8 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -27,7 +27,8 @@ import re
 nextIdx = 0
 
 def edgePropertiesString(properties):
-    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(properties.items())])
+    prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
+    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
 
 def getIdx():
     global nextIdx
@@ -280,10 +281,6 @@ def Edge(src, dst, properties):
     except:
         pass
     try:
-        return StageEdge(src, dst, properties)
-    except:
-        pass
-    try:
         return RuntimeEdge(src, dst, properties)
     except:
         pass
@@ -308,7 +305,7 @@ class IREdge:
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.coder = properties['coder']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
         src = self.src
@@ -327,37 +324,14 @@ class IREdge:
 
 class StageEdge:
     def __init__(self, src, dst, properties):
-        self.src = src
-        self.dst = dst
-        self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.externalVertexAttr = properties['externalVertexAttr']
-        self.parallelism = self.externalVertexAttr['Parallelism']
-        self.coder = properties['coder']
-    @property
-    def dot(self):
-        color = 'black'
-        try:
-            if self.externalVertexAttr['ContainerType'] == 'Transient':
-                color = 'orange'
-            if self.externalVertexAttr['ContainerType'] == 'Reserved':
-                color = 'green'
-        except:
-            pass
-        label = '{} (p{})<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, self.parallelism, edgePropertiesString(self.edgeProperties), self.coder)
-        return '{} -> {} [ltail = {}, lhead = {}, label = <{}>, color = {}];'.format(self.src.oneVertex.idx,
-                self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label, color)
-
-class StageEdge:
-    def __init__(self, src, dst, properties):
         self.src = src.internalDAG.vertices[properties['srcVertex']]
         self.dst = dst.internalDAG.vertices[properties['dstVertex']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.coder = properties['coder']
+        self.executionProperties = properties['executionProperties']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -366,11 +340,11 @@ class RuntimeEdge:
         self.src = src
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.coder = properties['coder']
+        self.executionProperties = properties['executionProperties']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
index c0d9eb4..4507143 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
@@ -21,7 +21,8 @@ import java.io.OutputStream;
 import java.io.Serializable;
 
 /**
- * A {@link Coder Coder&lt;T&gt;} object encodes or decodes values of type {@code T} into byte streams.
+ * A coder object encodes or decodes values of type {@code T} into byte streams.
+ *
  * @param <T> element type.
  */
 public interface Coder<T> extends Serializable {
@@ -31,7 +32,7 @@ public interface Coder<T> extends Serializable {
    * Because the user can want to keep a single output stream and continuously concatenate elements,
    * the output stream should not be closed.
    *
-   * @param element the element to be encoded
+   * @param element   the element to be encoded
    * @param outStream the stream on which encoded bytes are written
    * @throws IOException if fail to encode
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 9dbaabb..cfb6497 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.common.ir.edge;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -30,44 +29,42 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
  */
 public final class IREdge extends Edge<IRVertex> {
   private final ExecutionPropertyMap executionProperties;
-  private final Coder coder;
   private final Boolean isSideInput;
 
   /**
    * Constructor of IREdge.
+   * This constructor assumes that this edge is not for a side input.
+   *
    * @param commPattern data communication pattern type of the edge.
-   * @param src source vertex.
-   * @param dst destination vertex.
-   * @param coder coder.
+   * @param src         source vertex.
+   * @param dst         destination vertex.
    */
   public IREdge(final DataCommunicationPatternProperty.Value commPattern,
                 final IRVertex src,
-                final IRVertex dst,
-                final Coder coder) {
-    this(commPattern, src, dst, coder, false);
+                final IRVertex dst) {
+    this(commPattern, src, dst, false);
   }
 
   /**
    * Constructor of IREdge.
+   *
    * @param commPattern data communication pattern type of the edge.
-   * @param src source vertex.
-   * @param dst destination vertex.
-   * @param coder coder.
+   * @param src         source vertex.
+   * @param dst         destination vertex.
    * @param isSideInput flag for whether or not the edge is a sideInput.
    */
   public IREdge(final DataCommunicationPatternProperty.Value commPattern,
                 final IRVertex src,
                 final IRVertex dst,
-                final Coder coder,
                 final Boolean isSideInput) {
     super(IdManager.newEdgeId(), src, dst);
-    this.coder = coder;
     this.isSideInput = isSideInput;
     this.executionProperties = ExecutionPropertyMap.of(this, commPattern);
   }
 
   /**
    * Set an executionProperty of the IREdge.
+   *
    * @param executionProperty the execution property.
    * @return the IREdge with the execution property set.
    */
@@ -78,7 +75,8 @@ public final class IREdge extends Edge<IRVertex> {
 
   /**
    * Get the executionProperty of the IREdge.
-   * @param <T> Type of the return value.
+   *
+   * @param <T>                  Type of the return value.
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
@@ -94,13 +92,6 @@ public final class IREdge extends Edge<IRVertex> {
   }
 
   /**
-   * @return coder for the edge.
-   */
-  public Coder getCoder() {
-    return coder;
-  }
-
-  /**
    * @return whether or not the edge is a side input edge.
    */
   public Boolean isSideInput() {
@@ -117,6 +108,7 @@ public final class IREdge extends Edge<IRVertex> {
 
   /**
    * Static function to copy executionProperties from an edge to the other.
+   *
    * @param thatEdge the edge to copy executionProperties to.
    */
   public void copyExecutionPropertiesTo(final IREdge thatEdge) {
@@ -132,7 +124,7 @@ public final class IREdge extends Edge<IRVertex> {
       return false;
     }
 
-    IREdge irEdge = (IREdge) o;
+    final IREdge irEdge = (IREdge) o;
 
     return executionProperties.equals(irEdge.getExecutionProperties()) && hasSameItineraryAs(irEdge);
   }
@@ -151,8 +143,7 @@ public final class IREdge extends Edge<IRVertex> {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"id\": \"").append(getId());
     sb.append("\", \"executionProperties\": ").append(executionProperties);
-    sb.append(", \"coder\": \"").append(coder.toString());
-    sb.append("\"}");
+    sb.append("}");
     return sb.toString();
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
similarity index 73%
copy from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
copy to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
index eaf73a6..1d2e5dc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
@@ -15,27 +15,29 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 /**
- * KeyExtractor ExecutionProperty.
+ * Coder ExecutionProperty.
  */
-public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
+public final class CoderProperty extends ExecutionProperty<Coder> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
-  private KeyExtractorProperty(final KeyExtractor value) {
-    super(Key.KeyExtractor, value);
+  private CoderProperty(final Coder value) {
+    super(Key.Coder, value);
   }
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static KeyExtractorProperty of(final KeyExtractor value) {
-    return new KeyExtractorProperty(value);
+  public static CoderProperty of(final Coder value) {
+    return new CoderProperty(value);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
index eaf73a6..0387e07 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
@@ -24,6 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private KeyExtractorProperty(final KeyExtractor value) {
@@ -32,6 +33,7 @@ public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor>
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
index ef55daa..3bb4a4f 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.common.ir.executionproperty;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * An abstract class for each execution factors.
@@ -59,6 +60,24 @@ public abstract class ExecutionProperty<T> implements Serializable {
     };
   }
 
+  @Override
+  public final boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final ExecutionProperty<?> that = (ExecutionProperty<?>) o;
+    return getKey() == that.getKey()
+        && Objects.equals(getValue(), that.getValue());
+  }
+
+  @Override
+  public final int hashCode() {
+    return Objects.hash(getKey(), getValue());
+  }
+
   /**
    * Key for different types of execution property.
    */
@@ -73,6 +92,7 @@ public abstract class ExecutionProperty<T> implements Serializable {
     UsedDataHandling,
     Compression,
     DuplicateEdgeGroup,
+    Coder,
 
     // Applies to IRVertex
     DynamicOptimizationType,
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 75f92cd..78d153a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -218,7 +218,7 @@ public final class LoopVertex extends IRVertex {
       dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
         final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            newSrc, newIrVertex, edge.getCoder(), edge.isSideInput());
+            newSrc, newIrVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.connectVertices(newIrEdge);
       });
@@ -227,7 +227,7 @@ public final class LoopVertex extends IRVertex {
     // process DAG incoming edges.
     getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
       final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.getCoder(), edge.isSideInput());
+          edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       dagBuilder.connectVertices(newIrEdge);
     }));
@@ -236,7 +236,7 @@ public final class LoopVertex extends IRVertex {
       // if termination condition met, we process the DAG outgoing edge.
       getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edge -> {
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.getCoder(), edge.isSideInput());
+            originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.addVertex(edge.getDst()).connectVertices(newIrEdge);
       }));
@@ -247,7 +247,7 @@ public final class LoopVertex extends IRVertex {
     this.nonIterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
     this.iterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
       final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.getCoder(), edge.isSideInput());
+          originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       this.addDagIncomingEdge(newIrEdge);
     }));
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index f61adc6..3a7f9bd 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.compiler.frontend.beam;
 
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -105,8 +106,8 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
     beamNode.getInputs().values().stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
-          final BeamCoder coder = pValueToCoder.get(pValue);
-          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, coder);
+          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex);
+          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           this.builder.connectVertices(edge);
         });
@@ -200,9 +201,9 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
     sideInputs.stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
-          final BeamCoder coder = pValueToCoder.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex),
-              src, irVertex, coder, true);
+              src, irVertex, true);
+          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           builder.connectVertices(edge);
         });
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 1cf535a..8ad00d3 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -53,6 +54,8 @@ import java.util.Stack;
  * Utility class for RDDs.
  */
 public final class SparkFrontendUtils {
+  private static final KeyExtractorProperty SPARK_KEY_EXTRACTOR_PROP = KeyExtractorProperty.of(new SparkKeyExtractor());
+
   /**
    * Private constructor.
    */
@@ -96,8 +99,9 @@ public final class SparkFrontendUtils {
     builder.addVertex(collectVertex, loopVertexStack);
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
-        lastVertex, collectVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
+        lastVertex, collectVertex);
+    newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer)));
+    newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
     builder.connectVertices(newEdge);
 
     // launch DAG
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index bb1c496..0b61d7b 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,7 +19,8 @@ import java.util
 
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -69,7 +70,9 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     builder.addVertex(reduceByKeyVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
-      self.lastVertex, reduceByKeyVertex, new SparkCoder[Tuple2[K, V]](self.serializer))
+      self.lastVertex, reduceByKeyVertex)
+    newEdge.setProperty(
+      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer)).asInstanceOf[ExecutionProperty[_]])
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index cfa144d..12318c5 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -20,7 +20,8 @@ import java.util
 import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -48,8 +49,11 @@ final class RDD[T: ClassTag] protected[rdd] (
     protected[rdd] val lastVertex: IRVertex,
     private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
 
-  private val loopVertexStack = new util.Stack[LoopVertex]
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
+  private val loopVertexStack = new util.Stack[LoopVertex]
+  private val coderProperty: ExecutionProperty[_] =
+    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[ExecutionProperty[_]]
+  private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
 
   /**
    * Constructor without dependencies (not needed in Nemo RDD).
@@ -132,8 +136,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(mapVertex, loopVertexStack)
 
     val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
-      lastVertex, mapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, mapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
     new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, mapVertex, Option.empty)
@@ -150,8 +155,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(flatMapVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, flatMapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
     new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, flatMapVertex, Option.empty)
@@ -179,8 +185,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(reduceVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
-      lastVertex, reduceVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, reduceVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
     ReduceTransform.reduceIterator(
@@ -202,8 +209,9 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     builder.addVertex(flatMapVertex, loopVertexStack)
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, flatMapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
     JobLauncher.launchDAG(builder.build)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
index cadb443..0213242 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
@@ -71,7 +71,7 @@ public final class CompiletimeOptimizer {
       if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, processedDAG))
           || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, processedDAG))) {
         throw new CompileTimeOptimizationException(passToApply.getClass().getSimpleName()
-            + "is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+            + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
             + "Modify it or use a general CompileTimePass");
       }
       // Save the processed JSON DAG.
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
index 6695ec1..8225df2 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.compiler.optimizer.examples;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -58,12 +57,10 @@ public final class MapReduceDisaggregationOptimization {
     builder.addVertex(map);
     builder.addVertex(reduce);
 
-    final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        source, map, Coder.DUMMY_CODER);
+    final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map);
     builder.connectVertices(edge1);
 
-    final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-        map, reduce, Coder.DUMMY_CODER);
+    final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map, reduce);
     builder.connectVertices(edge2);
 
     final DAG<IRVertex, IREdge> dag = builder.build();
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
new file mode 100644
index 0000000..d4efdda
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+
+/**
+ * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ */
+public final class DefaultEdgeCoderPass extends AnnotatingPass {
+
+  private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+
+  /**
+   * Default constructor.
+   */
+  public DefaultEdgeCoderPass() {
+    super(ExecutionProperty.Key.Coder, Collections.emptySet());
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(irVertex ->
+        dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+          if (irEdge.getProperty(ExecutionProperty.Key.Coder) == null) {
+            irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+          }
+        }));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index d0de39d..5a5949c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -31,6 +31,7 @@ public final class PrimitiveCompositePass extends CompositePass {
   public PrimitiveCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+        new DefaultEdgeCoderPass(),
         new DefaultStagePartitioningPass(),
         new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
         new DefaultEdgeUsedDataHandlingPass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 18757ae..117e1af 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
@@ -147,7 +148,8 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
             outEdges.getOrDefault(ov, new HashSet<>()).forEach(e -> {
               outListToModify.remove(e);
               final IREdge newIrEdge = new IREdge(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  operatorVertexToUse, e.getDst(), e.getCoder());
+                  operatorVertexToUse, e.getDst());
+              newIrEdge.setProperty(CoderProperty.of(e.getProperty(ExecutionProperty.Key.Coder)));
               outListToModify.add(newIrEdge);
             });
             outEdges.remove(ov);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 32f8b5d..88e16d6 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -65,10 +66,11 @@ public final class DataSkewReshapingPass extends ReshapingPass {
                 .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
             // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
             final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-                edge.getSrc(), metricCollectionBarrierVertex, edge.getCoder());
+                edge.getSrc(), metricCollectionBarrierVertex);
+            newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
 
             final IREdge edgeToGbK = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                metricCollectionBarrierVertex, v, edge.getCoder(), edge.isSideInput());
+                metricCollectionBarrierVertex, v, edge.isSideInput());
             edge.copyExecutionPropertiesTo(edgeToGbK);
             builder.connectVertices(newEdge);
             builder.connectVertices(edgeToGbK);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 02ede77..b913c77 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -61,7 +61,7 @@ public final class LoopExtractionPass extends ReshapingPass {
 
   /**
    * This part groups each iteration of loops together by observing the LoopVertex assigned to primitive operators,
-   * which is assigned by the {@link edu.snu.nemo.client.beam.NemoPipelineVisitor}. This also shows in which depth of
+   * which is assigned by the NemoPipelineVisitor. This also shows in which depth of
    * nested loops the function handles. It recursively calls itself from the maximum depth until 0.
    * @param dag DAG to process
    * @param depth the depth of the stack to process. Must be greater than 0.
@@ -100,7 +100,7 @@ public final class LoopExtractionPass extends ReshapingPass {
                 srcLoopVertex.addDagOutgoingEdge(irEdge);
                 final IREdge edgeFromLoop =
                     new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                        srcLoopVertex, operatorVertex, irEdge.getCoder(), irEdge.isSideInput());
+                        srcLoopVertex, operatorVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(edgeFromLoop);
                 builder.connectVertices(edgeFromLoop);
                 srcLoopVertex.mapEdgeWithLoop(edgeFromLoop, irEdge);
@@ -148,7 +148,7 @@ public final class LoopExtractionPass extends ReshapingPass {
         } else { // loop -> loop connection
           assignedLoopVertex.addDagIncomingEdge(irEdge);
           final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-              srcLoopVertex, assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+              srcLoopVertex, assignedLoopVertex, irEdge.isSideInput());
           irEdge.copyExecutionPropertiesTo(edgeToLoop);
           builder.connectVertices(edgeToLoop);
           assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -156,7 +156,7 @@ public final class LoopExtractionPass extends ReshapingPass {
       } else { // operator -> loop
         assignedLoopVertex.addDagIncomingEdge(irEdge);
         final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            irEdge.getSrc(), assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+            irEdge.getSrc(), assignedLoopVertex, irEdge.isSideInput());
         irEdge.copyExecutionPropertiesTo(edgeToLoop);
         builder.connectVertices(edgeToLoop);
         assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -227,13 +227,13 @@ public final class LoopExtractionPass extends ReshapingPass {
 
               // add the new IREdge to the iterative incoming edges list.
               final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  equivalentSrcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+                  equivalentSrcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addIterativeIncomingEdge(newIrEdge);
             } else {
               // src is from outside the previous loop. vertex outside previous loop -> DAG.
               final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  srcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+                  srcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addNonIterativeIncomingEdge(newIrEdge);
             }
@@ -246,7 +246,7 @@ public final class LoopExtractionPass extends ReshapingPass {
             final IRVertex equivalentSrcVertex = equivalentVertices.get(srcVertex);
 
             final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                equivalentSrcVertex, dstVertex, edge.getCoder(), edge.isSideInput());
+                equivalentSrcVertex, dstVertex, edge.isSideInput());
             edge.copyExecutionPropertiesTo(newIrEdge);
             finalRootLoopVertex.addDagOutgoingEdge(newIrEdge);
             finalRootLoopVertex.mapEdgeWithLoop(loopVertex.getEdgeWithLoop(edge), newIrEdge);
@@ -291,7 +291,7 @@ public final class LoopExtractionPass extends ReshapingPass {
         builder.connectVertices(edge);
       } else {
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            firstEquivalentVertex, irVertex, edge.getCoder(), edge.isSideInput());
+            firstEquivalentVertex, irVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         builder.connectVertices(newIrEdge);
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index d2aaeee..1e5f23f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.dag.DAG;
@@ -159,7 +160,7 @@ public final class LoopOptimizations {
             inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getSrc())) {
                 final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    irEdge.getSrc(), newLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+                    irEdge.getSrc(), newLoopVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -168,7 +169,7 @@ public final class LoopOptimizations {
             outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getDst())) {
                 final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    newLoopVertex, irEdge.getDst(), irEdge.getCoder(), irEdge.isSideInput());
+                    newLoopVertex, irEdge.getDst(), irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -285,8 +286,10 @@ public final class LoopOptimizations {
               candidate.getValue().stream().map(IREdge::getSrc).anyMatch(edgeSrc -> edgeSrc.equals(e.getSrc())))
               .forEach(edge -> {
                 edgesToRemove.add(edge);
-                edgesToAdd.add(new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    candidate.getKey(), edge.getDst(), edge.getCoder(), edge.isSideInput()));
+                final IREdge newEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+                    candidate.getKey(), edge.getDst(), edge.isSideInput());
+                newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+                edgesToAdd.add(newEdge);
               });
           final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
           listToModify.removeAll(edgesToRemove);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 7770065..25f941c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
@@ -59,10 +60,11 @@ public final class SailfishRelayReshapingPass extends ReshapingPass {
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
             builder.addVertex(iFileMergerVertex);
             final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-                edge.getSrc(), iFileMergerVertex, edge.getCoder(), edge.isSideInput());
-            final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-                iFileMergerVertex, v, edge.getCoder());
+                edge.getSrc(), iFileMergerVertex, edge.isSideInput());
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
+            final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
+                iFileMergerVertex, v);
+            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
             builder.connectVertices(newEdgeToMerger);
             builder.connectVertices(newEdgeFromMerger);
           } else {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index adaf752..3e791dc 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -186,8 +186,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
                 irEdge.getId(),
                 irEdge.getExecutionProperties(),
                 irEdge.getSrc(),
-                irEdge.getDst(),
-                irEdge.getCoder()));
+                irEdge.getDst()));
           } else { // edge comes from another stage
             final Stage srcStage = vertexStageMap.get(srcVertex);
 
@@ -201,7 +200,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
                 .setSrcVertex(srcVertex)
                 .setDstVertex(dstVertex)
                 .setSrcStage(srcStage)
-                .setCoder(irEdge.getCoder())
                 .setSideInputFlag(irEdge.isSideInput());
             currentStageIncomingEdges.add(newEdgeBuilder);
           }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index c86d1e7..528beb8 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -26,69 +25,60 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
  * @param <V> the vertex type.
  */
 public class RuntimeEdge<V extends Vertex> extends Edge<V> {
-  private final ExecutionPropertyMap edgeProperties;
-  private final Coder coder;
+  private final ExecutionPropertyMap executionProperties;
   private final Boolean isSideInput;
 
   /**
    * Constructs the edge given the below parameters.
-   * @param runtimeEdgeId the id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param src the source vertex.
-   * @param dst the destination vertex.
-   * @param coder coder.
+   * This constructor assumes that this edge is not for a side input.
+   *
+   * @param runtimeEdgeId       the id of this edge.
+   * @param executionProperties to control the data flow on this edge.
+   * @param src                 the source vertex.
+   * @param dst                 the destination vertex.
    */
   public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap edgeProperties,
+                     final ExecutionPropertyMap executionProperties,
                      final V src,
-                     final V dst,
-                     final Coder coder) {
-    this(runtimeEdgeId, edgeProperties, src, dst, coder, false);
+                     final V dst) {
+    this(runtimeEdgeId, executionProperties, src, dst, false);
   }
 
   /**
    * Constructs the edge given the below parameters.
-   * @param runtimeEdgeId the id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param src the source vertex.
-   * @param dst the destination vertex.
-   * @param coder coder.
-   * @param isSideInput Whether or not the RuntimeEdge is a side input edge.
+   *
+   * @param runtimeEdgeId  the id of this edge.
+   * @param executionProperties to control the data flow on this edge.
+   * @param src            the source vertex.
+   * @param dst            the destination vertex.
+   * @param isSideInput    Whether or not the RuntimeEdge is a side input edge.
    */
   public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap edgeProperties,
+                     final ExecutionPropertyMap executionProperties,
                      final V src,
                      final V dst,
-                     final Coder coder,
                      final Boolean isSideInput) {
     super(runtimeEdgeId, src, dst);
-    this.edgeProperties = edgeProperties;
-    this.coder = coder;
+    this.executionProperties = executionProperties;
     this.isSideInput = isSideInput;
   }
 
   /**
    * Get the execution property of the Runtime Edge.
-   * @param <T> Type of the return value.
+   *
+   * @param <T>                  Type of the return value.
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
   public final <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
-    return edgeProperties.get(executionPropertyKey);
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
    * @return the ExecutionPropertyMap of the Runtime Edge.
    */
   public final ExecutionPropertyMap getExecutionProperties() {
-    return edgeProperties;
-  }
-
-  /**
-   * @return the coder for encoding and decoding.
-   */
-  public final Coder getCoder() {
-    return coder;
+    return executionProperties;
   }
 
   /**
@@ -106,9 +96,8 @@ public class RuntimeEdge<V extends Vertex> extends Edge<V> {
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(edgeProperties);
-    sb.append(", \"coder\": \"").append(coder.toString());
-    sb.append("\"}");
+    sb.append("\", \"executionProperties\": ").append(executionProperties);
+    sb.append("}");
     return sb.toString();
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 3b8cdf0..ec73077 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -48,24 +47,23 @@ public final class StageEdge extends RuntimeEdge<Stage> {
 
   /**
    * Constructor.
-   * @param runtimeEdgeId id of the runtime edge.
+   *
+   * @param runtimeEdgeId  id of the runtime edge.
    * @param edgeProperties edge execution properties.
-   * @param srcVertex source IRVertex in the srcStage of this edge.
-   * @param dstVertex destination IRVertex in the dstStage of this edge.
-   * @param srcStage source stage.
-   * @param dstStage destination stage.
-   * @param coder the coder for enconding and deconding.
-   * @param isSideInput whether or not the edge is a sideInput edge.
+   * @param srcVertex      source IRVertex in the srcStage of this edge.
+   * @param dstVertex      destination IRVertex in the dstStage of this edge.
+   * @param srcStage       source stage.
+   * @param dstStage       destination stage.
+   * @param isSideInput    whether or not the edge is a sideInput edge.
    */
-  public StageEdge(final String runtimeEdgeId,
-                   final ExecutionPropertyMap edgeProperties,
-                   final IRVertex srcVertex,
-                   final IRVertex dstVertex,
-                   final Stage srcStage,
-                   final Stage dstStage,
-                   final Coder coder,
-                   final Boolean isSideInput) {
-    super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
+  StageEdge(final String runtimeEdgeId,
+            final ExecutionPropertyMap edgeProperties,
+            final IRVertex srcVertex,
+            final IRVertex dstVertex,
+            final Stage srcStage,
+            final Stage dstStage,
+            final Boolean isSideInput) {
+    super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
     // Initialize the key range of each dst task.
@@ -96,7 +94,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
     sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
     sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
     sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
-    sb.append("\", \"coder\": \"").append(getCoder().toString());
     sb.append("\"}");
     return sb.toString();
   }
@@ -110,6 +107,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
 
   /**
    * Sets the task idx to key range list.
+   *
    * @param taskIdxToKeyRange the list to set.
    */
   public void setTaskIdxToKeyRange(final List<KeyRange> taskIdxToKeyRange) {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index b7448a1..5630b71 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -15,8 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 
@@ -30,19 +28,20 @@ public final class StageEdgeBuilder {
   private Stage dstStage;
   private IRVertex srcVertex;
   private IRVertex dstVertex;
-  private Coder coder;
   private Boolean isSideInput;
 
   /**
    * Represents the edge between vertices in a logical plan.
+   *
    * @param irEdgeId id of this edge.
    */
-  StageEdgeBuilder(final String irEdgeId) {
+  public StageEdgeBuilder(final String irEdgeId) {
     this.stageEdgeId = irEdgeId;
   }
 
   /**
    * Setter for edge properties.
+   *
    * @param ea the edge properties.
    * @return the updated StageEdgeBuilder.
    */
@@ -53,6 +52,7 @@ public final class StageEdgeBuilder {
 
   /**
    * Setter for the source stage.
+   *
    * @param ss the source stage.
    * @return the updated StageEdgeBuilder.
    */
@@ -63,6 +63,7 @@ public final class StageEdgeBuilder {
 
   /**
    * Setter for the destination stage.
+   *
    * @param ds the destination stage.
    * @return the updated StageEdgeBuilder.
    */
@@ -73,6 +74,7 @@ public final class StageEdgeBuilder {
 
   /**
    * Setter for the source vertex.
+   *
    * @param sv the source vertex.
    * @return the updated StageEdgeBuilder.
    */
@@ -83,6 +85,7 @@ public final class StageEdgeBuilder {
 
   /**
    * Setter for the destination vertex.
+   *
    * @param dv the destination vertex.
    * @return the updated StageEdgeBuilder.
    */
@@ -92,17 +95,8 @@ public final class StageEdgeBuilder {
   }
 
   /**
-   * Setter for coder.
-   * @param c the coder.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setCoder(final Coder c) {
-    this.coder = c;
-    return this;
-  }
-
-  /**
    * Setter for side input flag.
+   *
    * @param sideInputFlag the side input flag.
    * @return the updated StageEdgeBuilder.
    */
@@ -111,7 +105,10 @@ public final class StageEdgeBuilder {
     return this;
   }
 
+  /**
+   * @return the built stage edge.
+   */
   public StageEdge build() {
-    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
+    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8b2925d..155a9b0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
@@ -105,13 +106,13 @@ public final class Executor {
       final TaskStateManager taskStateManager =
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      task.getTaskIncomingEdges()
-          .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      task.getTaskOutgoingEdges()
-          .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+      task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
+          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+      task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
+          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
       irDag.getVertices().forEach(v -> {
-        irDag.getOutgoingEdgesOf(v)
-            .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+        irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
+            e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
       });
 
       new TaskExecutor(
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index f2c0082..2c4a2ac 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -260,10 +260,9 @@ public final class TaskExecutorTest {
                                            final IRVertex dst,
                                            final boolean isSideInput) {
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
-    final Coder coder = Coder.DUMMY_CODER;
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, coder, isSideInput);
+    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
   }
 
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 9d4142d..4888a1f 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.plangenerator;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -136,16 +135,16 @@ public final class TestPlanGenerator {
     v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
     dagBuilder.addVertex(v5);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     dagBuilder.connectVertices(e1);
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2);
     dagBuilder.connectVertices(e2);
 
-    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
     dagBuilder.connectVertices(e3);
 
-    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER);
+    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5);
     dagBuilder.connectVertices(e4);
 
     return dagBuilder.buildWithoutSourceSinkCheck();
@@ -182,10 +181,10 @@ public final class TestPlanGenerator {
     }
     dagBuilder.addVertex(v3);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     dagBuilder.connectVertices(e1);
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3);
     dagBuilder.connectVertices(e2);
 
     return dagBuilder.buildWithoutSourceSinkCheck();
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
index 0368499..919530d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
@@ -54,26 +54,17 @@ public class LoopVertexTest {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     loopDAGBuilder.addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            combine, map2, Coder.DUMMY_CODER));
-    loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        source, map1, Coder.DUMMY_CODER));
-    loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        map2, map1, Coder.DUMMY_CODER));
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2));
+    loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1));
+    loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, map2, map1));
 
     originalDAG = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 8f0c153..f6f089f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.tests.common.ir.executionproperty;
 
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
@@ -39,8 +40,7 @@ public class ExecutionPropertyMapTest {
   private final IRVertex source = new EmptyComponents.EmptySourceVertex<>("Source");
   private final IRVertex destination = new OperatorVertex(new EmptyComponents.EmptyTransform("MapElements"));
   private final DataCommunicationPatternProperty.Value comPattern = DataCommunicationPatternProperty.Value.OneToOne;
-  private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-      source, destination, Coder.DUMMY_CODER);
+  private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, destination);
 
   private ExecutionPropertyMap edgeMap;
   private ExecutionPropertyMap vertexMap;
@@ -65,6 +65,8 @@ public class ExecutionPropertyMapTest {
     assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(ExecutionProperty.Key.DataStore));
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(ExecutionProperty.Key.DataFlowModel));
+    edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
+    assertEquals(Coder.DUMMY_CODER, edgeMap.get(ExecutionProperty.Key.Coder));
 
     edgeMap.remove(ExecutionProperty.Key.DataFlowModel);
     assertNull(edgeMap.get(ExecutionProperty.Key.DataFlowModel));
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index ebbc7b6..cb4b70c 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,12 +53,10 @@ public final class NemoBackendTest<I, O> {
   @Before
   public void setUp() throws Exception {
     this.dag = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
 
     this.dag = CompiletimeOptimizer.optimize(dag, new PadoPolicy(), EMPTY_DAG_DIRECTORY);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
new file mode 100644
index 0000000..6eec026
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class DefaultEdgeCoderPassTest {
+  private DAG<IRVertex, IREdge> compiledDAG;
+
+  @Before
+  public void setUp() throws Exception {
+    compiledDAG = CompilerTestUtil.compileMRDAG();
+  }
+
+  @Test
+  public void testAnnotatingPass() {
+    final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
+    assertEquals(ExecutionProperty.Key.Coder, coderPass.getExecutionPropertyToModify());
+  }
+
+  @Test
+  public void testNotOverride() {
+    // Get the first coder from the compiled DAG
+    final Coder compiledCoder = compiledDAG
+        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+    // Get the first coder from the processed DAG
+    final Coder processedCoder = processedDAG
+        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    assertEquals(compiledCoder, processedCoder); // It must not be changed.
+  }
+
+  @Test
+  public void testSetToDefault() throws Exception {
+    // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
+    compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
+        .get(0).getExecutionProperties().remove(ExecutionProperty.Key.Coder);
+    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+    // Check whether the pass set the empty coder to our default coder.
+    final Coder processedCoder = processedDAG
+        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    assertEquals(Coder.DUMMY_CODER, processedCoder);
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
index 2be50db..6c8a52a 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public class DefaultParallelismPassTest {
-  DAG<IRVertex, IREdge> compiledDAG;
+  private DAG<IRVertex, IREdge> compiledDAG;
 
   @Before
   public void setUp() throws Exception {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
index d6f6353..28fc29f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -58,16 +57,16 @@ public class CommonSubexpressionEliminationPassTest {
     final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     dagNotToOptimize = dagBuilder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine)
         .addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
     dagToOptimize = dagBuilder.addVertex(map1clone).addVertex(groupByKey2).addVertex(combine2).addVertex(map22)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22))
         .build();
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index e670c4d..159feba 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -101,7 +102,8 @@ public class LoopFusionPassTest {
    * This method adds a LoopVertex at the end of the DAG (no more outgoing edges), after the
    * {@param vertexToBeFollowed}. We assume, as in the MLR, ALS DAG, that iterative incoming edges work to receive
    * main inputs, and non-iterative incoming edges work to receive side inputs.
-   * @param builder builder to add the LoopVertex to.
+   *
+   * @param builder            builder to add the LoopVertex to.
    * @param vertexToBeFollowed vertex that is to be followed by the LoopVertex.
    * @param loopVertexToFollow the new LoopVertex that will be added.
    */
@@ -111,12 +113,14 @@ public class LoopFusionPassTest {
     builder.addVertex(loopVertexToFollow);
     loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          vertexToBeFollowed, loopVertexToFollow, irEdge.getCoder());
+          vertexToBeFollowed, loopVertexToFollow);
+      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
       builder.connectVertices(newIREdge);
     }));
     loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          irEdge.getSrc(), loopVertexToFollow, irEdge.getCoder());
+          irEdge.getSrc(), loopVertexToFollow);
+      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
       builder.connectVertices(newIREdge);
     }));
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index 4df2fbd..18feb79 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -87,11 +88,11 @@ public class LoopInvariantCodeMotionPassTest {
           if (!e.getSrc().equals(vertex7)) {
             builder.connectVertices(e);
           } else {
-            final Optional<IREdge> theIncomingEdge = newDAGIncomingEdge.stream().findFirst();
-            assertTrue(theIncomingEdge.isPresent());
-            final IREdge newIREdge =
-                new IREdge(theIncomingEdge.get().getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    theIncomingEdge.get().getSrc(), alsLoop, theIncomingEdge.get().getCoder());
+            final Optional<IREdge> incomingEdge = newDAGIncomingEdge.stream().findFirst();
+            assertTrue(incomingEdge.isPresent());
+            final IREdge newIREdge = new IREdge(incomingEdge.get().getProperty(
+                ExecutionProperty.Key.DataCommunicationPattern), incomingEdge.get().getSrc(), alsLoop);
+            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getProperty(ExecutionProperty.Key.Coder)));
             builder.connectVertices(newIREdge);
           }
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 287e15b..793c182 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -29,21 +29,21 @@ public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(12, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(13, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(14, padoPolicy.getCompileTimePasses().size());
+    assertEquals(15, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(16, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(17, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index f948589..677b2e9 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -72,7 +72,7 @@ public final class DAGConverterTest {
     v2.setProperty(ParallelismProperty.of(2));
     irDAGBuilder.addVertex(v2);
 
-    final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     irDAGBuilder.connectVertices(e);
 
     final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
@@ -155,27 +155,27 @@ public final class DAGConverterTest {
     // TODO #13: Implement Join Node
 //    irDAGBuilder.addVertex(v7);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
     e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
     e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
     e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
-    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5, Coder.DUMMY_CODER);
+    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
     e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
-    final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6, Coder.DUMMY_CODER);
+    final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
     e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8, Coder.DUMMY_CODER);
+    final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
     e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 697122e..2a8a224 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,6 +39,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -308,22 +309,30 @@ public final class DataTransferTest {
     final IRVertex dstVertex = verticesPair.right();
 
     // Edge setup
-    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
     edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-
     edgeProperties.put(DataStoreProperty.of(store));
     edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    edgeProperties.put(CoderProperty.of(CODER));
     final RuntimeEdge dummyEdge;
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
-        srcStage, dstStage, CODER, false);
+    dummyEdge = new StageEdgeBuilder(edgeId)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage)
+        .setSideInputFlag(false)
+        .build();
+
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -391,7 +400,8 @@ public final class DataTransferTest {
     final IRVertex dstVertex = verticesPair.right();
 
     // Edge setup
-    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
@@ -409,12 +419,24 @@ public final class DataTransferTest {
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
-        srcStage, dstStage, CODER, false);
+    dummyEdge = new StageEdgeBuilder(edgeId)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage)
+        .setSideInputFlag(false)
+        .build();
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
     final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
-        srcStage, dstStage2, CODER, false);
+    dummyEdge2 = new StageEdgeBuilder(edgeId2)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex2)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage2)
+        .setSideInputFlag(false)
+        .build();
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(

-- 
To stop receiving notification emails like this one, please contact
johnyangk@apache.org.

Mime
View raw message