nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-20] RESTful APIs to Access Job State and Metric (#61)
Date Tue, 03 Jul 2018 08:29:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe914c  [NEMO-20] RESTful APIs to Access Job State and Metric (#61)
9fe914c is described below

commit 9fe914c81d3b577499ee24370421c7260bf06e9d
Author: Jae Hyeon Park <usezmap@gmail.com>
AuthorDate: Tue Jul 3 17:29:06 2018 +0900

    [NEMO-20] RESTful APIs to Access Job State and Metric (#61)
    
    JIRA: NEMO-20: RESTful APIs to Access Job State and Metric
    
    Major changes:
    * Removed previous MetricCollector.
    * Add Metric interface to handle easily with metrics.
    * Add MetricStore which collects all metric data at the master side.
    * Add HTTP and WebSocket REST API endpoints.
    
    Minor changes to note:
    N/A
    
    Tests for the changes:
    * Simple JSON dump test at MetricStoreTest.
    
    Other comments:
    N/A
    
    resolves NEMO-20
---
 .../exception/UnsupportedMetricException.java      |  18 +-
 pom.xml                                            |   2 +
 runtime/common/pom.xml                             |  10 +
 .../runtime/common/metric/DataTransferEvent.java   |  54 +++++
 .../edu/snu/nemo/runtime/common/metric/Event.java} |  33 +--
 .../snu/nemo/runtime/common/metric/JobMetric.java  |  80 +++++++
 .../MetricFlushPeriod.java => Metric.java}         |  23 +-
 .../snu/nemo/runtime/common/metric/MetricData.java |  71 ------
 .../runtime/common/metric/MetricDataBuilder.java   |  97 --------
 .../nemo/runtime/common/metric/StageMetric.java    |  59 +++++
 .../nemo/runtime/common/metric/StateMetric.java}   |  29 +--
 .../common/metric/StateTransitionEvent.java}       |  37 +--
 .../snu/nemo/runtime/common/metric/TaskMetric.java | 146 ++++++++++++
 runtime/common/src/main/proto/ControlMessage.proto |   6 +-
 .../snu/nemo/runtime/executor/MetricCollector.java |  67 ------
 .../nemo/runtime/executor/MetricManagerWorker.java |  21 +-
 .../nemo/runtime/executor/MetricMessageSender.java |   8 +-
 .../nemo/runtime/executor/TaskStateManager.java    |  26 +--
 .../nemo/runtime/executor/task/DataFetcher.java    |   8 -
 .../executor/task/ParentTaskDataFetcher.java       |  24 +-
 .../executor/task/SourceVertexDataFetcher.java     |  11 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   |  53 +++--
 .../executor/task/ParentTaskDataFetcherTest.java   |   1 -
 .../runtime/executor/task/TaskExecutorTest.java    |   2 +-
 runtime/master/pom.xml                             |  20 ++
 .../snu/nemo/runtime/master/JobStateManager.java   |  82 ++-----
 .../snu/nemo/runtime/master/MetricBroadcaster.java | 102 +++++++++
 .../nemo/runtime/master/MetricManagerMaster.java   |  34 ++-
 .../nemo/runtime/master/MetricMessageHandler.java  |  18 +-
 .../edu/snu/nemo/runtime/master/MetricStore.java   | 254 +++++++++++++++++++++
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  44 +++-
 .../runtime/master/servlet/AllMetricServlet.java   |  38 +++
 .../runtime/master/servlet/JobMetricServlet.java   |  39 ++++
 .../runtime/master/servlet/StageMetricServlet.java |  39 ++++
 .../runtime/master/servlet/TaskMetricServlet.java  |  39 ++++
 .../master/servlet/WebSocketMetricAdapter.java     |  62 +++++
 .../master/servlet/WebSocketMetricServlet.java}    |  17 +-
 .../snu/nemo/runtime/master/MetricStoreTest.java   |  52 +++++
 38 files changed, 1258 insertions(+), 468 deletions(-)

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java b/common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
similarity index 62%
copy from runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
copy to common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
index d81db8c..f7a3296 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
@@ -13,14 +13,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.metric.parameter;
-
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+package edu.snu.nemo.common.exception;
 
 /**
- * Metric flushing period.
+ * UnsupportedMetricException.
+ * This exception will be thrown when MetricStore receives unsupported metric.
  */
-@NamedParameter(doc = "Metric flushing period (ms)", short_name = "mf_period", default_value = "5000")
-public final class MetricFlushPeriod implements Name<Long> {
+public final class UnsupportedMetricException extends RuntimeException {
+  /**
+   * UnsupportedMetricException.
+   * @param cause cause
+   */
+  public UnsupportedMetricException(final Throwable cause) {
+    super(cause);
+  }
 }
diff --git a/pom.xml b/pom.xml
index 95939f0..42ad7ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,8 @@ limitations under the License.
         <jackson.version>2.8.8</jackson.version>
         <netlib.version>1.1.2</netlib.version>
         <netty.version>4.1.16.Final</netty.version>
+        <jetty-server.version>9.4.10.v20180503</jetty-server.version>
+        <jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
         <slf4j.version>1.7.20</slf4j.version>
         <!-- Tests -->
         <mockito.version>2.13.0</mockito.version>
diff --git a/runtime/common/pom.xml b/runtime/common/pom.xml
index cedf02d..97fca5b 100644
--- a/runtime/common/pom.xml
+++ b/runtime/common/pom.xml
@@ -68,5 +68,15 @@ limitations under the License.
             <version>${grpc.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java
new file mode 100644
index 0000000..3f28244
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+/**
+ * Event for data transfer, such as data read or write.
+ */
+public class DataTransferEvent extends Event {
+  private TransferType transferType;
+
+  public DataTransferEvent(final long timestamp, final TransferType transferType) {
+    super(timestamp);
+    this.transferType = transferType;
+  }
+
+  /**
+   * Get transfer type.
+   * @return TransferType.
+   */
+  public final TransferType getTransferType() {
+    return transferType;
+  }
+
+  /**
+   * Set transfer type.
+   * @param transferType TransferType to set.
+   */
+  public final void setTransferType(final TransferType transferType) {
+    this.transferType = transferType;
+  }
+
+  /**
+   * Enum of transfer types.
+   */
+  public enum TransferType {
+    READ_START,
+    READ_END,
+    WRITE_START,
+    WRITE_END
+  }
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java
similarity index 51%
copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
copy to runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java
index dcd5155..0746bd2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java
@@ -13,30 +13,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.executor;
+package edu.snu.nemo.runtime.common.metric;
 
-import org.apache.reef.tang.annotations.DefaultImplementation;
+import java.io.Serializable;
 
 /**
- * Interface for metric sender.
+ * Class for all generic event that contains timestamp at the moment.
  */
-@DefaultImplementation(MetricManagerWorker.class)
-public interface MetricMessageSender extends AutoCloseable {
+public class Event implements Serializable {
+  private long timestamp;
 
   /**
-   * Send metric to master.
-   * @param metricKey key of the metric
-   * @param metricValue value of the metric
+   * Constructor.
+   * @param timestamp timestamp in millisecond.
    */
-  void send(final String metricKey, final String metricValue);
+  public Event(final long timestamp) {
+    this.timestamp = timestamp;
+  }
 
   /**
-   * Flush all metric inside of the queue.
+   * Get timestamp.
+   * @return timestamp.
    */
-  void flush();
+  public final long getTimestamp() {
+    return timestamp;
+  };
 
   /**
-   * Flush the metric queue and close the metric dispatch.
+   * Set timestamp.
+   * @param timestamp timestamp in millisecond.
    */
-  void close();
+  public final void setTimestamp(final long timestamp) {
+    this.timestamp = timestamp;
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
new file mode 100644
index 0000000..fea3a20
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.JobState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metric class for Job (or {@link PhysicalPlan}).
+ */
+public final class JobMetric implements StateMetric<JobState.State> {
+  private String id;
+  private List<StateTransitionEvent<JobState.State>> stateTransitionEvents = new ArrayList<>();
+  private JsonNode stageDagJson;
+
+  public JobMetric(final PhysicalPlan physicalPlan) {
+    this.id = physicalPlan.getId();
+  }
+
+  public JobMetric(final String id) {
+    this.id = id;
+  }
+
+  @JsonProperty("dag")
+  public JsonNode getStageDAG() {
+    return stageDagJson;
+  }
+
+  public void setStageDAG(final DAG dag) {
+    final String dagJson = dag.toString();
+    final ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      this.stageDagJson = objectMapper.readTree(dagJson);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public List<StateTransitionEvent<JobState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public void addEvent(final JobState.State prevState, final JobState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  @Override
+  public boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    // do nothing
+    return false;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java
similarity index 56%
copy from runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
copy to runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java
index d81db8c..1d44c08 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java
@@ -13,14 +13,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.metric.parameter;
-
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+package edu.snu.nemo.runtime.common.metric;
 
 /**
- * Metric flushing period.
+ * Interface for all metrics.
  */
-@NamedParameter(doc = "Metric flushing period (ms)", short_name = "mf_period", default_value = "5000")
-public final class MetricFlushPeriod implements Name<Long> {
+public interface Metric {
+  /**
+   * Get its unique id.
+   * @return an unique id
+   */
+  String getId();
+
+  /**
+   * Process metric message from evaluators.
+   * @param metricField field name of the metric.
+   * @param metricValue byte array of serialized data value.
+   * @return true if the metric was changed or false if not.
+   */
+  boolean processMetricMessage(final String metricField, final byte[] metricValue);
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
deleted file mode 100644
index ce61cbf..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.metric;
-
-import edu.snu.nemo.common.exception.JsonParseException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.util.Map;
-
-/**
- * MetricData that holds executor side metrics.
- */
-public class MetricData {
-  /**
-   * Computation units are: Job, State, Task.
-   */
-  private final String computationUnitId;
-  private final ObjectMapper objectMapper;
-  private final Map<String, Object> metrics;
-
-  /**
-   * Constructor.
-   * @param computationUnitId the id of the computation unit.
-   * @param metrics the metric data.
-   */
-  MetricData(final String computationUnitId,
-             final Map<String, Object> metrics) {
-    this.computationUnitId = computationUnitId;
-    this.objectMapper = new ObjectMapper();
-    this.metrics = metrics;
-  }
-
-  /**
-   * @return the computation unit id.
-   */
-  public final String getComputationUnitId() {
-    return computationUnitId;
-  }
-
-  /**
-   * @return the metric data.
-   */
-  public final Map<String, Object> getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * @return a JSON expression of the metric data.
-   */
-  public final String toJson() {
-    try {
-      final String jsonStr = objectMapper.writeValueAsString(metrics);
-      return jsonStr;
-    } catch (final Exception e) {
-      throw new JsonParseException(e);
-    }
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java
deleted file mode 100644
index 2031ede..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.snu.nemo.runtime.common.metric;
-
-import java.util.Map;
-
-/**
- * MetricData Builder.
- */
-public final class MetricDataBuilder {
-  private final String computationUnitId;
-  private long startTime;
-  private long endTime;
-  private Map<String, Object> metrics;
-
-  /**
-   * Constructor.
-   * @param computationUnitId id of the computation unit.
-   */
-  public MetricDataBuilder(final String computationUnitId) {
-    this.computationUnitId = computationUnitId;
-    startTime = 0;
-    endTime = 0;
-    metrics = null;
-  }
-
-  /**
-   * @return the id of the computation unit.
-   */
-  public String getComputationUnitId() {
-    return computationUnitId;
-  }
-
-  /**
-   * @return the metric data.
-   */
-  public Map<String, Object> getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * @return the time at which metric collection starts.
-   */
-  public long getStartTime() {
-    return startTime;
-  }
-
-  /**
-   * @return the time at which metric collection ends.
-   */
-  public long getEndTime() {
-    return endTime;
-  }
-
-  /**
-   * Begin the measurement of metric data.
-   * @param metricMap map on which to collect metrics.
-   */
-  public void beginMeasurement(final Map<String, Object> metricMap) {
-    startTime = System.currentTimeMillis();
-    metricMap.put("StartTime", startTime);
-    this.metrics = metricMap;
-  }
-
-  /**
-   * End the measurement of metric data.
-   * @param metricMap map on which to collect metrics.
-   */
-  public void endMeasurement(final Map<String, Object> metricMap) {
-    endTime = System.currentTimeMillis();
-    metricMap.put("EndTime", endTime);
-    metricMap.put("ElapsedTime(ms)", endTime - startTime);
-    this.metrics.putAll(metricMap);
-  }
-
-  /**
-   * Builds immutable MetricData.
-   * @return the MetricData constructed by the builder.
-   */
-  public MetricData build() {
-    return new MetricData(getComputationUnitId(), getMetrics());
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java
new file mode 100644
index 0000000..ff174a9
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.state.StageState;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metric class for {@link Stage}.
+ */
+public class StageMetric implements StateMetric<StageState.State> {
+  private String id;
+  private List<StateTransitionEvent<StageState.State>> stateTransitionEvents = new ArrayList<>();
+
+  public StageMetric(final Stage stage) {
+    this.id = stage.getId();
+  }
+
+  public StageMetric(final String id) {
+    this.id = id;
+  }
+
+  @Override
+  public final String getId() {
+    return id;
+  }
+
+  @Override
+  public final List<StateTransitionEvent<StageState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public final void addEvent(final StageState.State prevState, final StageState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  @Override
+  public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    // do nothing
+    return false;
+  }
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
similarity index 53%
copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
copy to runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
index dcd5155..426e8e0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
@@ -13,30 +13,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.executor;
+package edu.snu.nemo.runtime.common.metric;
 
-import org.apache.reef.tang.annotations.DefaultImplementation;
+import java.util.List;
 
 /**
- * Interface for metric sender.
+ * Interface for metric which contians its state.
+ * @param <T> class of state of the metric.
  */
-@DefaultImplementation(MetricManagerWorker.class)
-public interface MetricMessageSender extends AutoCloseable {
-
-  /**
-   * Send metric to master.
-   * @param metricKey key of the metric
-   * @param metricValue value of the metric
-   */
-  void send(final String metricKey, final String metricValue);
-
+public interface StateMetric<T> extends Metric {
   /**
-   * Flush all metric inside of the queue.
+   * Get its list of {@link StateTransitionEvent}.
+   * @return list of events.
    */
-  void flush();
+  List<StateTransitionEvent<T>> getStateTransitionEvents();
 
   /**
-   * Flush the metric queue and close the metric dispatch.
+   * Add a {@link StateTransitionEvent} to the metric.
+   * @param prevState previous state.
+   * @param newState new state.
    */
-  void close();
+  void addEvent(final T prevState, final T newState);
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
similarity index 50%
copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
copy to runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
index dcd5155..43ce124 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
@@ -13,30 +13,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.executor;
-
-import org.apache.reef.tang.annotations.DefaultImplementation;
+package edu.snu.nemo.runtime.common.metric;
 
 /**
- * Interface for metric sender.
+ * Event of state transition. It contains timestamp and the state transition.
+ * @param <T> class of state for the metric.
  */
-@DefaultImplementation(MetricManagerWorker.class)
-public interface MetricMessageSender extends AutoCloseable {
+public final class StateTransitionEvent<T> extends Event {
+  private T prevState;
+  private T newState;
 
-  /**
-   * Send metric to master.
-   * @param metricKey key of the metric
-   * @param metricValue value of the metric
-   */
-  void send(final String metricKey, final String metricValue);
+  public StateTransitionEvent(final long timestamp, final T prevState, final T newState) {
+    super(timestamp);
+    this.prevState = prevState;
+    this.newState = newState;
+  }
 
   /**
-   * Flush all metric inside of the queue.
+   * Get previous state.
+   * @return previous state.
    */
-  void flush();
+  public T getPrevState() {
+    return prevState;
+  }
 
   /**
-   * Flush the metric queue and close the metric dispatch.
+   * Get new state.
+   * @return new state.
    */
-  void close();
+  public T getNewState() {
+    return newState;
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
new file mode 100644
index 0000000..db24f4d
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import edu.snu.nemo.runtime.common.state.TaskState;
+import org.apache.commons.lang3.SerializationUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metric class for {@link edu.snu.nemo.runtime.common.plan.Task}.
+ */
+public class TaskMetric implements StateMetric<TaskState.State> {
+  private String id;
+  private List<StateTransitionEvent<TaskState.State>> stateTransitionEvents = new ArrayList<>();
+  private long serializedReadBytes = -1;
+  private long encodedReadBytes = -1;
+  private long writtenBytes = -1;
+  private long boundedSourceReadTime = -1;
+  private int scheduleAttempt = -1;
+  private String containerId = "";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskMetric.class.getName());
+
+  public TaskMetric(final String id) {
+    this.id = id;
+  }
+
+  public final long getSerializedReadBytes() {
+    return serializedReadBytes;
+  }
+
+  private void setSerializedReadBytes(final long serializedReadBytes) {
+    this.serializedReadBytes = serializedReadBytes;
+  }
+
+  public final long getEncodedReadBytes() {
+    return encodedReadBytes;
+  }
+
+  private void setEncodedReadBytes(final long encodedReadBytes) {
+    this.encodedReadBytes = encodedReadBytes;
+  }
+
+  public final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
+
+  private void setBoundedSourceReadTime(final long boundedSourceReadTime) {
+    this.boundedSourceReadTime = boundedSourceReadTime;
+  }
+
+  public final long getWrittenBytes() {
+    return writtenBytes;
+  }
+
+  private void setWrittenBytes(final long writtenBytes) {
+    this.writtenBytes = writtenBytes;
+  }
+
+  public final int getScheduleAttempt() {
+    return scheduleAttempt;
+  }
+
+  private void setScheduleAttempt(final int scheduleAttempt) {
+    this.scheduleAttempt = scheduleAttempt;
+  }
+
+  public final String getContainerId() {
+    return containerId;
+  }
+
+  private void setContainerId(final String containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public final List<StateTransitionEvent<TaskState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public final String getId() {
+    return id;
+  }
+
+  @Override
+  public final void addEvent(final TaskState.State prevState, final TaskState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  private void addEvent(final StateTransitionEvent<TaskState.State> event) {
+    stateTransitionEvents.add(event);
+  }
+
+  @Override
+  public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    LOG.info("metric {} is just arrived!", metricField);
+    switch (metricField) {
+      case "serializedReadBytes":
+        setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "encodedReadBytes":
+        setEncodedReadBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "boundedSourceReadTime":
+        setBoundedSourceReadTime(SerializationUtils.deserialize(metricValue));
+        break;
+      case "writtenBytes":
+        setWrittenBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "stateTransitionEvent":
+        final StateTransitionEvent<TaskState.State> newStateTransitionEvent =
+            SerializationUtils.deserialize(metricValue);
+        addEvent(newStateTransitionEvent);
+        break;
+      case "scheduleAttempt":
+        setScheduleAttempt(SerializationUtils.deserialize(metricValue));
+        break;
+      case "containerId":
+        setContainerId(SerializationUtils.deserialize(metricValue));
+        break;
+      default:
+        LOG.warn("metricField {} is not supported.", metricField);
+        return false;
+    }
+    return true;
+  }
+}
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 7a3cd7f..b0b1747 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -187,6 +187,8 @@ enum BlockStore {
 
 // Common messages
 message Metric {
-    required string metricKey = 1;
-    required string metricValue = 2;
+    required string metricType = 1;
+    required string metricId = 2;
+    required string metricField = 3;
+    required bytes metricValue = 4;
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java
deleted file mode 100644
index 3e53b2e..0000000
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.executor;
-
-import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This metric collector collects metrics and send through {@link MetricMessageSender}.
- */
-public final class MetricCollector {
-
-  private final MetricMessageSender metricMessageSender;
-  private final Map<String, MetricDataBuilder> metricDataBuilderMap;
-
-  /**
-   * Constructor.
-   *
-   * @param metricMessageSender the metric message sender.
-   */
-  public MetricCollector(final MetricMessageSender metricMessageSender) {
-    this.metricMessageSender = metricMessageSender;
-    this.metricDataBuilderMap = new HashMap<>();
-  }
-
-  /**
-   * Begins recording the start time of this metric measurement, in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   *
-   * @param compUnitId    to be used as metricKey
-   * @param initialMetric metric to add
-   */
-  public void beginMeasurement(final String compUnitId, final Map<String, Object> initialMetric) {
-    final MetricDataBuilder metricDataBuilder = new MetricDataBuilder(compUnitId);
-    metricDataBuilder.beginMeasurement(initialMetric);
-    metricDataBuilderMap.put(compUnitId, metricDataBuilder);
-  }
-
-  /**
-   * Ends this metric measurement, recording the end time in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   *
-   * @param compUnitId  to be used as metricKey
-   * @param finalMetric metric to add
-   */
-  public void endMeasurement(final String compUnitId, final Map<String, Object> finalMetric) {
-    final MetricDataBuilder metricDataBuilder = metricDataBuilderMap.get(compUnitId);
-    metricDataBuilder.endMeasurement(finalMetric);
-    metricMessageSender.send(compUnitId, metricDataBuilder.build().toJson());
-    metricDataBuilderMap.remove(compUnitId);
-  }
-}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index f6ffd08..405ce4c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -15,14 +15,13 @@
  */
 package edu.snu.nemo.runtime.executor;
 
+import com.google.protobuf.ByteString;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.metric.parameter.MetricFlushPeriod;
 import org.apache.reef.annotations.audience.EvaluatorSide;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
 import java.util.concurrent.*;
@@ -40,17 +39,17 @@ public final class MetricManagerWorker implements MetricMessageSender {
   private final BlockingQueue<ControlMessage.Metric> metricMessageQueue;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
+  private static final int FLUSHING_PERIOD = 3000;
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerWorker.class.getName());
 
   @Inject
-  private MetricManagerWorker(@Parameter(MetricFlushPeriod.class) final long flushingPeriod,
-                              final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
+  private MetricManagerWorker(final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
     this.metricMessageQueue = new LinkedBlockingQueue<>();
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     final Runnable batchMetricMessages = () -> flushMetricMessageQueueToMaster();
     this.scheduledExecutorService.scheduleAtFixedRate(batchMetricMessages, 0,
-                                                      flushingPeriod, TimeUnit.MILLISECONDS);
+                                                      FLUSHING_PERIOD, TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -89,15 +88,19 @@ public final class MetricManagerWorker implements MetricMessageSender {
   }
 
   @Override
-  public void send(final String metricKey, final String metricValue) {
-    LOG.debug("Executor logged! {}", metricKey);
+  public void send(final String metricType, final String metricId,
+                   final String metricField, final byte[] metricValue) {
     metricMessageQueue.add(
-        ControlMessage.Metric.newBuilder().setMetricKey(metricKey).setMetricValue(metricValue).build());
+        ControlMessage.Metric.newBuilder()
+            .setMetricType(metricType)
+            .setMetricId(metricId)
+            .setMetricField(metricField)
+            .setMetricValue(ByteString.copyFrom(metricValue))
+            .build());
   }
 
   @Override
   public void close() throws UnknownFailureCauseException {
-    LOG.info("Shutting down MetricManager ");
     scheduledExecutorService.shutdownNow();
     flushMetricMessageQueueToMaster();
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
index dcd5155..a7693f6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
@@ -25,10 +25,12 @@ public interface MetricMessageSender extends AutoCloseable {
 
   /**
    * Send metric to master.
-   * @param metricKey key of the metric
-   * @param metricValue value of the metric
+   * @param metricType type of the metric
+   * @param metricId id of the metric
+   * @param metricField field of the metric
+   * @param metricValue value of the metric which is serialized
    */
-  void send(final String metricKey, final String metricValue);
+  void send(final String metricType, final String metricId, final String metricField, final byte[] metricValue);
 
   /**
    * Flush all metric inside of the queue.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 9645621..a707b03 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,11 +21,13 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import edu.snu.nemo.runtime.common.metric.StateTransitionEvent;
 import edu.snu.nemo.runtime.common.plan.Task;
 
 import java.util.*;
 
 import edu.snu.nemo.runtime.common.state.TaskState;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.annotations.audience.EvaluatorSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,7 @@ public final class TaskStateManager {
   private final String taskId;
   private final int attemptIdx;
   private final String executorId;
-  private final MetricCollector metricCollector;
+  private final MetricMessageSender metricMessageSender;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
   public TaskStateManager(final Task task,
@@ -52,7 +54,12 @@ public final class TaskStateManager {
     this.attemptIdx = task.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
-    this.metricCollector = new MetricCollector(metricMessageSender);
+    this.metricMessageSender = metricMessageSender;
+
+    metricMessageSender.send("TaskMetric", taskId,
+        "containerId", SerializationUtils.serialize(executorId));
+    metricMessageSender.send("TaskMetric", taskId,
+        "scheduleAttempt", SerializationUtils.serialize(attemptIdx));
   }
 
   /**
@@ -64,32 +71,25 @@ public final class TaskStateManager {
   public synchronized void onTaskStateChanged(final TaskState.State newState,
                                               final Optional<String> vertexPutOnHold,
                                               final Optional<TaskState.RecoverableTaskFailureCause> cause) {
-    final Map<String, Object> metric = new HashMap<>();
+    metricMessageSender.send("TaskMetric", taskId,
+        "stateTransitionEvent", SerializationUtils.serialize(new StateTransitionEvent<>(
+            System.currentTimeMillis(), null, newState
+        )));
 
     switch (newState) {
       case EXECUTING:
         LOG.debug("Executing Task ID {}...", this.taskId);
-        metric.put("ContainerId", executorId);
-        metric.put("ScheduleAttempt", attemptIdx);
-        metric.put("FromState", newState);
-        metricCollector.beginMeasurement(taskId, metric);
         break;
       case COMPLETE:
         LOG.debug("Task ID {} complete!", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case SHOULD_RETRY:
         LOG.debug("Task ID {} failed (recoverable).", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case FAILED:
         LOG.debug("Task ID {} failed (unrecoverable).", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case ON_HOLD:
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
index 3dbc689..c7aeb0e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
@@ -18,7 +18,6 @@ package edu.snu.nemo.runtime.executor.task;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
  * An abstraction for fetching data from task-external sources.
@@ -26,18 +25,15 @@ import java.util.Map;
 abstract class DataFetcher {
   private final IRVertex dataSource;
   private final VertexHarness child;
-  private final Map<String, Object> metricMap;
   private final boolean isToSideInput;
   private final boolean isFromSideInput;
 
   DataFetcher(final IRVertex dataSource,
               final VertexHarness child,
-              final Map<String, Object> metricMap,
               final boolean isFromSideInput,
               final boolean isToSideInput) {
     this.dataSource = dataSource;
     this.child = child;
-    this.metricMap = metricMap;
     this.isToSideInput = isToSideInput;
     this.isFromSideInput = isFromSideInput;
   }
@@ -50,10 +46,6 @@ abstract class DataFetcher {
    */
   abstract Object fetchDataElement() throws IOException;
 
-  protected Map<String, Object> getMetricMap() {
-    return metricMap;
-  }
-
   VertexHarness getChild() {
     return child;
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 1f4276a..68ba2c1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -44,22 +43,21 @@ class ParentTaskDataFetcher extends DataFetcher {
   private DataUtil.IteratorWithNumBytes currentIterator;
   private int currentIteratorIndex;
   private boolean noElementAtAll = true;
+  private long serBytes = 0;
+  private long encodedBytes = 0;
 
   ParentTaskDataFetcher(final IRVertex dataSource,
                         final InputReader readerForParentTask,
                         final VertexHarness child,
-                        final Map<String, Object> metricMap,
                         final boolean isToSideInput) {
-    super(dataSource, child, metricMap, readerForParentTask.isSideInputReader(), isToSideInput);
+    super(dataSource, child, readerForParentTask.isSideInputReader(), isToSideInput);
     this.readersForParentTask = readerForParentTask;
     this.hasFetchStarted = false;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
   }
 
-  private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
-    long serBytes = 0;
-    long encodedBytes = 0;
+  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
     try {
       serBytes += iterator.getNumSerializedBytes();
     } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
@@ -74,10 +72,6 @@ class ParentTaskDataFetcher extends DataFetcher {
     } catch (final IllegalStateException e) {
       LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
     }
-    if (serBytes != encodedBytes) {
-      getMetricMap().put("ReadBytes(raw)", serBytes);
-    }
-    getMetricMap().put("ReadBytes", encodedBytes);
   }
 
   /**
@@ -126,7 +120,7 @@ class ParentTaskDataFetcher extends DataFetcher {
           }
         } else {
           // Advance to the next one
-          handleMetric(currentIterator);
+          countBytes(currentIterator);
           advanceIterator();
           return fetchDataElement();
         }
@@ -160,4 +154,12 @@ class ParentTaskDataFetcher extends DataFetcher {
       this.currentIteratorIndex++;
     }
   }
+
+  public final long getSerializedBytes() {
+    return serBytes;
+  }
+
+  public final long getEncodedBytes() {
+    return encodedBytes;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 116b9c4..817139b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Fetches data from a data source.
@@ -30,13 +29,13 @@ class SourceVertexDataFetcher extends DataFetcher {
 
   // Non-finals (lazy fetching)
   private Iterator iterator;
+  private long boundedSourceReadTime = 0;
 
   SourceVertexDataFetcher(final IRVertex dataSource,
                           final Readable readable,
                           final VertexHarness child,
-                          final Map<String, Object> metricMap,
                           final boolean isToSideInput) {
-    super(dataSource, child, metricMap, false, isToSideInput);
+    super(dataSource, child, false, isToSideInput);
     this.readable = readable;
   }
 
@@ -45,7 +44,7 @@ class SourceVertexDataFetcher extends DataFetcher {
     if (iterator == null) {
       final long start = System.currentTimeMillis();
       iterator = this.readable.read().iterator();
-      getMetricMap().put("BoundedSourceReadTime(ms)", System.currentTimeMillis() - start);
+      boundedSourceReadTime += System.currentTimeMillis() - start;
     }
 
     if (iterator.hasNext()) {
@@ -54,4 +53,8 @@ class SourceVertexDataFetcher extends DataFetcher {
       return null;
     }
   }
+
+  public final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 78974b4..741bb30 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -31,7 +31,6 @@ import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.executor.MetricCollector;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
 import edu.snu.nemo.runtime.executor.TaskStateManager;
 import edu.snu.nemo.runtime.executor.datatransfer.*;
@@ -39,6 +38,8 @@ import edu.snu.nemo.runtime.executor.datatransfer.*;
 import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,8 +64,10 @@ public final class TaskExecutor {
   private final Map sideInputMap;
 
   // Metrics information
-  private final Map<String, Object> metricMap;
-  private final MetricCollector metricCollector;
+  private long boundedSourceReadTime = 0;
+  private long serializedReadBytes = 0;
+  private long encodedReadBytes = 0;
+  private final MetricMessageSender metricMessageSender;
 
   // Dynamic optimization
   private String idOfVertexPutOnHold;
@@ -90,9 +93,8 @@ public final class TaskExecutor {
     this.taskId = task.getTaskId();
     this.taskStateManager = taskStateManager;
 
-    // Metrics information
-    this.metricMap = new HashMap<>();
-    this.metricCollector = new MetricCollector(metricMessageSender);
+    // Metric sender
+    this.metricMessageSender = metricMessageSender;
 
     // Dynamic optimization
     // Assigning null is very bad, but we are keeping this for now
@@ -174,13 +176,13 @@ public final class TaskExecutor {
       final boolean isToSideInput = isToSideInputs.stream().anyMatch(bool -> bool);
       if (irVertex instanceof SourceVertex) {
         dataFetcherList.add(new SourceVertexDataFetcher(
-            irVertex, sourceReader.get(), vertexHarness, metricMap, isToSideInput)); // Source vertex read
+            irVertex, sourceReader.get(), vertexHarness, isToSideInput)); // Source vertex read
       }
       final List<InputReader> parentTaskReaders =
           getParentTaskReaders(taskIndex, irVertex, task.getTaskIncomingEdges(), dataTransferFactory);
       parentTaskReaders.forEach(parentTaskReader -> {
         dataFetcherList.add(new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-            vertexHarness, metricMap, isToSideInput)); // Parent-task read
+            vertexHarness, isToSideInput)); // Parent-task read
       });
     });
 
@@ -254,7 +256,6 @@ public final class TaskExecutor {
     }
     LOG.info("{} started", taskId);
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
-    metricCollector.beginMeasurement(taskId, metricMap);
 
     // Phase 1: Consume task-external side-input related data.
     final Map<Boolean, List<DataFetcher>> sideInputRelated = dataFetchers.stream()
@@ -277,6 +278,13 @@ public final class TaskExecutor {
       return;
     }
 
+    metricMessageSender.send("TaskMetric", taskId,
+        "boundedSourceReadTime", SerializationUtils.serialize(boundedSourceReadTime));
+    metricMessageSender.send("TaskMetric", taskId,
+        "serializedReadBytes", SerializationUtils.serialize(serializedReadBytes));
+    metricMessageSender.send("TaskMetric", taskId,
+        "encodedReadBytes", SerializationUtils.serialize(encodedReadBytes));
+
     // Phase 3: Finalize task-internal states and elements
     for (final VertexHarness vertexHarness : sortedHarnesses) {
       if (finalizeLater.contains(vertexHarness)) {
@@ -284,8 +292,6 @@ public final class TaskExecutor {
       }
     }
 
-    // Miscellaneous: Metrics, DynOpt, etc
-    metricCollector.endMeasurement(taskId, metricMap);
     if (idOfVertexPutOnHold == null) {
       taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
       LOG.info("{} completed", taskId);
@@ -357,6 +363,12 @@ public final class TaskExecutor {
         }
 
         if (element == null) {
+          if (dataFetcher instanceof SourceVertexDataFetcher) {
+            boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
+          } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+            serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
+            encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
+          }
           finishedFetcherIndex = i;
           break;
         } else {
@@ -470,11 +482,6 @@ public final class TaskExecutor {
    */
   private void finalizeOutputWriters(final VertexHarness vertexHarness) {
     final List<Long> writtenBytesList = new ArrayList<>();
-    final Map<String, Object> metric = new HashMap<>();
-    final IRVertex irVertex = vertexHarness.getIRVertex();
-
-    metricCollector.beginMeasurement(irVertex.getId(), metric);
-    final long writeStartTime = System.currentTimeMillis();
 
     vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> {
       outputWriter.close();
@@ -482,15 +489,11 @@ public final class TaskExecutor {
       writtenBytes.ifPresent(writtenBytesList::add);
     });
 
-    final long writeEndTime = System.currentTimeMillis();
-    metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
-    if (!writtenBytesList.isEmpty()) {
-      long totalWrittenBytes = 0;
-      for (final Long writtenBytes : writtenBytesList) {
-        totalWrittenBytes += writtenBytes;
-      }
-      metricMap.put("WrittenBytes", totalWrittenBytes);
+    long totalWrittenBytes = 0;
+    for (final Long writtenBytes : writtenBytesList) {
+      totalWrittenBytes += writtenBytes;
     }
-    metricCollector.endMeasurement(irVertex.getId(), metric);
+    metricMessageSender.send("TaskMetric", taskId,
+        "writtenBytes", SerializationUtils.serialize(totalWrittenBytes));
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index e2b3826..931e751 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -108,7 +108,6 @@ public final class ParentTaskDataFetcherTest {
         mock(IRVertex.class),
         readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
         mock(VertexHarness.class),
-        new HashMap<>(0),
         false);
   }
 
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index b6b5bc4..f9c4663 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -102,7 +102,7 @@ public final class TaskExecutorTest {
 
     // Mock a MetricMessageSender.
     metricMessageSender = mock(MetricMessageSender.class);
-    doNothing().when(metricMessageSender).send(anyString(), anyString());
+    doNothing().when(metricMessageSender).send(anyString(), anyString(), anyString(), any());
     doNothing().when(metricMessageSender).close();
 
     persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index 6cb5abc..8ea4688 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -52,6 +52,26 @@ limitations under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty-server.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-api</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-server</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <version>${jackson.version}</version>
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 2a0ef88..fd3e582 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.exception.IllegalStateTransitionException;
 import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
-import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
@@ -37,6 +36,9 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+import edu.snu.nemo.runtime.common.metric.StageMetric;
+import edu.snu.nemo.runtime.common.metric.TaskMetric;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,7 +89,8 @@ public final class JobStateManager {
    * For metrics.
    */
   private final MetricMessageHandler metricMessageHandler;
-  private final Map<String, MetricDataBuilder> metricDataBuilderMap;
+
+  private MetricStore metricStore;
 
   public JobStateManager(final PhysicalPlan physicalPlan,
                          final MetricMessageHandler metricMessageHandler,
@@ -102,7 +105,10 @@ public final class JobStateManager {
     this.taskIdToCurrentAttempt = new HashMap<>();
     this.finishLock = new ReentrantLock();
     this.jobFinishedCondition = finishLock.newCondition();
-    this.metricDataBuilderMap = new HashMap<>();
+    this.metricStore = MetricStore.getStore();
+
+    metricStore.getOrCreateMetric(JobMetric.class, jobId).setStageDAG(physicalPlan.getStageDAG());
+    metricStore.triggerBroadcast(JobMetric.class, jobId);
     initializeComputationStates();
   }
 
@@ -140,25 +146,21 @@ public final class JobStateManager {
     LOG.debug("Task State Transition: id {}, from {} to {}",
         new Object[]{taskId, taskState.getCurrentState(), newTaskState});
 
+    metricStore.getOrCreateMetric(TaskMetric.class, taskId)
+        .addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
+    metricStore.triggerBroadcast(TaskMetric.class, taskId);
+
     taskState.setState(newTaskState);
 
-    // Handle metrics
-    final Map<String, Object> metric = new HashMap<>();
     switch (newTaskState) {
       case ON_HOLD:
       case COMPLETE:
       case FAILED:
       case SHOULD_RETRY:
-        metric.put("ToState", newTaskState);
-        endMeasurement(taskId, metric);
-        break;
       case EXECUTING:
-        metric.put("FromState", newTaskState);
-        beginMeasurement(taskId, metric);
         break;
       case READY:
         final int currentAttempt = taskIdToCurrentAttempt.get(taskId) + 1;
-        metric.put("ScheduleAttempt", currentAttempt);
         if (currentAttempt <= maxScheduleAttempt) {
           taskIdToCurrentAttempt.put(taskId, currentAttempt);
         } else {
@@ -215,21 +217,16 @@ public final class JobStateManager {
   private void onStageStateChanged(final String stageId, final StageState.State newStageState) {
     // Change stage state
     final StateMachine stageStateMachine = idToStageStates.get(stageId).getStateMachine();
+
+    metricStore.getOrCreateMetric(StageMetric.class, stageId)
+        .addEvent(getStageState(stageId), newStageState);
+    metricStore.triggerBroadcast(StageMetric.class, stageId);
+
     LOG.debug("Stage State Transition: id {} from {} to {}",
         new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
     stageStateMachine.setState(newStageState);
 
-    // Metric handling
-    final Map<String, Object> metric = new HashMap<>();
-    if (newStageState == StageState.State.INCOMPLETE) {
-      metric.put("FromState", newStageState);
-      beginMeasurement(stageId, metric);
-    } else if (newStageState == StageState.State.COMPLETE) {
-      metric.put("ToState", newStageState);
-      endMeasurement(stageId, metric);
-    }
-
-    // Job becomse COMPLETE
+    // Change job state if needed
     final boolean allStagesCompleted = idToStageStates.values().stream().allMatch(state ->
         state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
     if (allStagesCompleted) {
@@ -243,20 +240,19 @@ public final class JobStateManager {
    * @param newState of the job.
    */
   private void onJobStateChanged(final JobState.State newState) {
+    metricStore.getOrCreateMetric(JobMetric.class, jobId)
+        .addEvent((JobState.State) jobState.getStateMachine().getCurrentState(), newState);
+    metricStore.triggerBroadcast(JobMetric.class, jobId);
+
     jobState.getStateMachine().setState(newState);
 
-    final Map<String, Object> metric = new HashMap<>();
     if (newState == JobState.State.EXECUTING) {
-      LOG.info("Executing Job ID {}...", this.jobId);
-      metric.put("FromState", newState);
-      beginMeasurement(jobId, metric);
+      LOG.debug("Executing Job ID {}...", this.jobId);
     } else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
       LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
 
       // Awake all threads waiting the finish of this job.
       finishLock.lock();
-      metric.put("ToState", newState);
-      endMeasurement(jobId, metric);
 
       try {
         jobFinishedCondition.signalAll();
@@ -345,36 +341,6 @@ public final class JobStateManager {
   }
 
   /**
-   * Begins recording the start time of this metric measurement, in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   * @param compUnitId to be used as metricKey
-   * @param initialMetric metric to add
-   */
-  private void beginMeasurement(final String compUnitId, final Map<String, Object> initialMetric) {
-    final MetricDataBuilder metricDataBuilder = new MetricDataBuilder(compUnitId);
-    metricDataBuilder.beginMeasurement(initialMetric);
-    metricDataBuilderMap.put(compUnitId, metricDataBuilder);
-  }
-
-  /**
-   * Ends this metric measurement, recording the end time in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   * @param compUnitId to be used as metricKey
-   * @param finalMetric metric to add
-   */
-  private void endMeasurement(final String compUnitId, final Map<String, Object> finalMetric) {
-    final MetricDataBuilder metricDataBuilder = metricDataBuilderMap.get(compUnitId);
-
-    // may be null when a Task fails without entering the executing state (due to an input read failure)
-    if (metricDataBuilder != null) {
-      finalMetric.put("ContainerId", "Master");
-      metricDataBuilder.endMeasurement(finalMetric);
-      metricMessageHandler.onMetricMessageReceived(compUnitId, metricDataBuilder.build().toJson());
-      metricDataBuilderMap.remove(compUnitId);
-    }
-  }
-
-  /**
    * Stores JSON representation of job state into a file.
    * @param directory the directory which JSON representation is saved to
    * @param suffix suffix for file name
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java
new file mode 100644
index 0000000..f9f0be6
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import org.eclipse.jetty.websocket.api.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * MetricBroadcaster broadcast metric changes to the currently active WebSocket sessions.
+ */
+public final class MetricBroadcaster {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricBroadcaster.class.getName());
+  private final Set<Session> sessions = ConcurrentHashMap.newKeySet();
+  /**
+   * Private constructor.
+   */
+  private MetricBroadcaster() { }
+
+  /**
+   * Getter for the singleton object.
+   * @return MetricBroadcaster object.
+   */
+  public static MetricBroadcaster getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  /**
+   * Lazy class object holder for MetricBroadcaster class.
+   */
+  private static class InstanceHolder {
+    private static final MetricBroadcaster INSTANCE = new MetricBroadcaster();
+  }
+
+  /**
+   * Add a session to the session list.
+   * @param session a WebSocket session.
+   */
+  public synchronized void addSession(final Session session) {
+    try {
+      session.getRemote().sendString(MetricStore.getStore().dumpAllMetricToJson());
+    } catch (final IOException e) {
+      LOG.warn("Failed to send initial metric to newly connected session.");
+    }
+    sessions.add(session);
+  }
+
+  /**
+   * Remove a session from the session list.
+   * @param session a WebSocket session.
+   */
+  public synchronized void removeSession(final Session session) {
+    sessions.remove(session);
+  }
+
+  /**
+   * Send text frame to each WebSocket session.
+   * @param text text to send.
+   */
+  public void broadcast(final String text) {
+    for (final Session session : sessions) {
+      try {
+        session.getRemote().sendString(text);
+      } catch (final IOException e) {
+        LOG.warn("Failed to send string to remote session {}.", session.getRemoteAddress().toString());
+      }
+    }
+  }
+
+  /**
+   * Send binary frame to each WebSocket session.
+   * @param bytes byte array to send.
+   */
+  public void broadcast(final byte[] bytes) {
+    for (final Session session : sessions) {
+      try {
+        session.getRemote().sendBytes(ByteBuffer.wrap(bytes));
+      } catch (final IOException e) {
+        LOG.warn("Failed to send binary to remote session {}.", session.getRemoteAddress().toString());
+      }
+    }
+  }
+
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
index 5b6a8fd..4a266d5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
@@ -21,15 +21,11 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import edu.snu.nemo.runtime.common.metric.Metric;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * A default metric message handler.
  */
@@ -37,13 +33,12 @@ import java.util.Map;
 public final class MetricManagerMaster implements MetricMessageHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerMaster.class.getName());
-  private final Map<String, List<String>> compUnitIdToMetricInJson;
+  private final MetricStore metricStore = MetricStore.getStore();
   private boolean isTerminated;
   private final ExecutorRegistry executorRegistry;
 
   @Inject
   private MetricManagerMaster(final ExecutorRegistry executorRegistry) {
-    this.compUnitIdToMetricInJson = new HashMap<>();
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
   }
@@ -60,24 +55,25 @@ public final class MetricManagerMaster implements MetricMessageHandler {
   }
 
   @Override
-  public synchronized void onMetricMessageReceived(final String metricKey, final String metricValue) {
+  public synchronized void onMetricMessageReceived(final String metricType,
+                                                   final String metricId,
+                                                   final String metricField,
+                                                   final byte[] metricValue) {
     if (!isTerminated) {
-      compUnitIdToMetricInJson.putIfAbsent(metricKey, new LinkedList<>());
-      compUnitIdToMetricInJson.get(metricKey).add(metricValue);
-      LOG.debug("{\"computationUnitId\":\"{}\", \"metricList\":{}}", metricKey, metricValue);
+      final Class<Metric> metricClass = metricStore.getMetricClassByName(metricType);
+      // process metric message
+      try {
+        if (metricStore.getOrCreateMetric(metricClass, metricId).processMetricMessage(metricField, metricValue)) {
+          metricStore.triggerBroadcast(metricClass, metricId);
+        }
+      } catch (final Exception e) {
+        LOG.warn("Error when processing metric message for {}, {}, {}.", metricType, metricId, metricField);
+      }
     }
   }
 
   @Override
-  public synchronized List<String> getMetricByKey(final String metricKey) {
-    return compUnitIdToMetricInJson.get(metricKey);
-  }
-
-  @Override
   public synchronized void terminate() {
-    compUnitIdToMetricInJson.forEach((compUnitId, metricList) ->
-        LOG.info("{\"computationUnitId\":\"{}\", \"metricList\":{}}", compUnitId, metricList));
-    compUnitIdToMetricInJson.clear();
     isTerminated = true;
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
index 334bc0c..15ee083 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
@@ -17,8 +17,6 @@ package edu.snu.nemo.runtime.master;
 
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
-import java.util.List;
-
 /**
  * Metric message handler.
  */
@@ -27,17 +25,13 @@ public interface MetricMessageHandler {
 
   /**
    * Handle the received metric message.
-   * @param metricKey a given key for the metric (ex. Task ID)
-   * @param metricValue the metric formatted as a string (ex. JSON).
-   */
-  void onMetricMessageReceived(final String metricKey, final String metricValue);
-
-  /**
-   * Retrieves the string form of metric given the metric key.
-   * @param metricKey to retrieve the metric for
-   * @return the list of accumulated metric in string (ex. JSON)
+   * @param metricType a given type for the metric (ex. TaskMetric).
+   * @param metricId  id of the metric.
+   * @param metricField field name of the metric.
+   * @param metricValue serialized metric data value.
    */
-  List<String> getMetricByKey(final String metricKey);
+  void onMetricMessageReceived(final String metricType, final String metricId,
+                               final String metricField, final byte[] metricValue);
 
   /**
    * Cleans up and terminates this handler.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
new file mode 100644
index 0000000..09c7c9d
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.exception.UnsupportedMetricException;
+import edu.snu.nemo.runtime.common.metric.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * MetricStore stores metric data which will be used by web visualize interface, logging, and so on.
+ * All metric classes should be JSON-serializable by {@link ObjectMapper}.
+ */
+public final class MetricStore {
+  private final Map<Class, Map<String, Object>> metricMap = new HashMap<>();
+  // You can add more metrics by adding item to this metricList list.
+  private final Map<String, Class> metricList = new HashMap<String, Class>() {
+    {
+      put("JobMetric", JobMetric.class);
+      put("StageMetric", StageMetric.class);
+      put("TaskMetric", TaskMetric.class);
+    }
+  };
+
+  /**
+   * Private constructor.
+   */
+  private MetricStore() { }
+
+  /**
+   * Getter for singleton instance.
+   * @return MetricStore object.
+   */
+  public static MetricStore getStore() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  /**
+   * Lazy class object holder for MetricStore class.
+   */
+  private static class InstanceHolder {
+    private static final MetricStore INSTANCE = new MetricStore();
+  }
+
+  public <T extends Metric> Class<T> getMetricClassByName(final String className) {
+    if (!metricList.keySet().contains(className)) {
+      throw new NoSuchElementException();
+    }
+
+    return metricList.get(className);
+  }
+
+  /**
+   * Store a metric object. Metric object should implement {@link Metric} interface.
+   * This method will store a metric into a {@link Map}, which have metric's id as its key.
+   * @param metric metric object.
+   * @param <T> class of metric
+   */
+  public <T extends Metric> void putMetric(final T metric) {
+    final Class metricClass = metric.getClass();
+    if (!metricList.values().contains(metricClass)) {
+      throw new UnsupportedMetricException(new Throwable("Unsupported metric"));
+    }
+
+    metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).putIfAbsent(metric.getId(), metric);
+  }
+
+  /**
+   * Fetch metric by its metric class instance and its id.
+   * @param metricClass class instance of metric.
+   * @param id metric id, which can be fetched by getId() method.
+   * @param <T> class of metric
+   * @return a metric object.
+   */
+  public <T extends Metric> T getMetricWithId(final Class<T> metricClass, final String id) {
+    final T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
+    if (metric == null) {
+      throw new NoSuchElementException("No metric found");
+    }
+    return metric;
+  }
+
+  /**
+   * Fetch metric map by its metric class instance.
+   * @param metricClass class instance of metric.
+   * @param <T> class of metric
+   * @return a metric object.
+   */
+  public <T extends Metric> Map<String, Object> getMetricMap(final Class<T> metricClass) {
+    final Map<String, Object> metric = metricMap.computeIfAbsent(metricClass, k -> new HashMap<>());
+    if (metric == null) {
+      throw new NoSuchElementException("No metric found");
+    }
+    return metric;
+  }
+
+  /**
+   * Same as getMetricWithId(), but if there is no such metric, it will try to create new metric object
+   * using its constructor, which takes an id as a parameter.
+   * @param metricClass class of metric.
+   * @param id metric id, which can be fetched by getId() method.
+   * @param <T> class of metric
+   * @return a metric object. If there was no such metric, newly create one.
+   */
+  public <T extends Metric> T getOrCreateMetric(final Class<T> metricClass, final String id) {
+    T metric =  (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
+    if (metric == null) {
+      try {
+        metric = metricClass.getConstructor(new Class[]{String.class}).newInstance(id);
+        putMetric(metric);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return metric;
+  }
+
+  private void generatePreprocessedJsonFromMetricEntry(final Map.Entry<String, Object> idToMetricEntry,
+                                                       final JsonGenerator jsonGenerator,
+                                                       final ObjectMapper objectMapper) throws IOException {
+    final JsonNode jsonNode = objectMapper.valueToTree(idToMetricEntry.getValue());
+    jsonGenerator.writeFieldName(idToMetricEntry.getKey());
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeFieldName("id");
+    jsonGenerator.writeString(idToMetricEntry.getKey());
+    jsonGenerator.writeFieldName("data");
+    jsonGenerator.writeTree(jsonNode);
+    jsonGenerator.writeEndObject();
+  }
+
+  /**
+   * Dumps JSON-serialized string of specific metric.
+   * @param metricClass class of metric.
+   * @return dumped JSON string of all metric.
+   * @throws IOException when failed to write json.
+   */
+  public <T extends Metric> String dumpMetricToJson(final Class<T> metricClass) throws IOException {
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+    jsonGenerator.setCodec(objectMapper);
+
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeFieldName(metricClass.getSimpleName());
+    jsonGenerator.writeStartObject();
+    for (final Map.Entry<String, Object> idToMetricEntry : getMetricMap(metricClass).entrySet()) {
+      generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
+    }
+    jsonGenerator.writeEndObject();
+    jsonGenerator.writeEndObject();
+
+    jsonGenerator.close();
+    return stream.toString();
+  }
+
+  /**
+   * Dumps JSON-serialized string of all stored metric.
+   * @return dumped JSON string of all metric.
+   * @throws IOException when failed to write file.
+   */
+  public synchronized String dumpAllMetricToJson() throws IOException {
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+    jsonGenerator.setCodec(objectMapper);
+
+    jsonGenerator.writeStartObject();
+    for (final Map.Entry<Class, Map<String, Object>> metricMapEntry : metricMap.entrySet()) {
+      jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName());
+      jsonGenerator.writeStartObject();
+      for (final Map.Entry<String, Object> idToMetricEntry : metricMapEntry.getValue().entrySet()) {
+        generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
+      }
+      jsonGenerator.writeEndObject();
+    }
+    jsonGenerator.writeEndObject();
+
+    jsonGenerator.close();
+    return stream.toString();
+  }
+
+  /**
+   * Same as dumpAllMetricToJson(), but this will save it to the file.
+   * @param filePath path to dump JSON.
+   */
+  public void dumpAllMetricToFile(final String filePath) {
+    try {
+      final String jsonDump = dumpAllMetricToJson();
+      final BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));
+
+      writer.write(jsonDump);
+      writer.close();
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * Send changed metric data to {@link MetricBroadcaster}, which will broadcast it to
+   * all active WebSocket sessions. This method should be called manually if you want to
+   * send changed metric data to the frontend client. Also this method is synchronized.
+   * @param metricClass class of the metric.
+   * @param id id of the metric.
+   */
+  public synchronized <T extends Metric> void triggerBroadcast(final Class<T> metricClass, final String id) {
+    final MetricBroadcaster metricBroadcaster = MetricBroadcaster.getInstance();
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final T metric = getMetricWithId(metricClass, id);
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator;
+    try {
+      jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+
+      jsonGenerator.setCodec(objectMapper);
+
+      jsonGenerator.writeStartObject();
+      jsonGenerator.writeFieldName("metricType");
+      jsonGenerator.writeString(metricClass.getSimpleName());
+
+      jsonGenerator.writeFieldName("data");
+      jsonGenerator.writeObject(metric);
+      jsonGenerator.writeEndObject();
+
+      jsonGenerator.close();
+
+      metricBroadcaster.broadcast(stream.toString());
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 6d409d8..98928a3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -26,6 +26,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.servlet.*;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
@@ -38,6 +39,8 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +73,7 @@ public final class RuntimeMaster {
   private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
   private static final int DAG_LOGGING_PERIOD = 3000;
   private static final int METRIC_ARRIVE_TIMEOUT = 10000;
+  private static final int REST_SERVER_PORT = 10101;
 
   private final ExecutorService runtimeMasterThread;
 
@@ -78,6 +82,7 @@ public final class RuntimeMaster {
   private final BlockManagerMaster blockManagerMaster;
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
+  private final MetricStore metricStore;
   private final Map<Integer, Long> aggregatedMetricData;
   private final ClientRPC clientRPC;
   private final MetricManagerMaster metricManagerMaster;
@@ -91,6 +96,9 @@ public final class RuntimeMaster {
   private final AtomicInteger resourceRequestCount;
 
   private CountDownLatch metricCountDownLatch;
+  // REST API server for web metric visualization ui.
+  private final Server metricServer;
+
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
@@ -120,6 +128,29 @@ public final class RuntimeMaster {
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
     this.aggregatedMetricData = new HashMap<>();
+    this.metricStore = MetricStore.getStore();
+    this.metricServer = startRestMetricServer();
+  }
+
+  private Server startRestMetricServer() {
+    final Server server = new Server(REST_SERVER_PORT);
+
+    final ServletHandler servletHandler = new ServletHandler();
+    server.setHandler(servletHandler);
+
+    servletHandler.addServletWithMapping(JobMetricServlet.class, "/api/job");
+    servletHandler.addServletWithMapping(TaskMetricServlet.class, "/api/task");
+    servletHandler.addServletWithMapping(StageMetricServlet.class, "/api/stage");
+    servletHandler.addServletWithMapping(AllMetricServlet.class, "/api");
+    servletHandler.addServletWithMapping(WebSocketMetricServlet.class, "/api/websocket");
+
+    try {
+      server.start();
+    } catch (final Exception e) {
+      throw new RuntimeException("Failed to start REST API server.");
+    }
+
+    return server;
   }
 
   /**
@@ -172,6 +203,15 @@ public final class RuntimeMaster {
       metricMessageHandler.terminate();
       containerManager.terminate();
 
+      // TODO #?: parameterize file path using Tang
+      metricStore.dumpAllMetricToFile("/tmp/dump");
+
+      try {
+        metricServer.stop();
+      } catch (final Exception e) {
+        throw new RuntimeException("Failed to stop rest api server.");
+      }
+
     });
 
     // Do not shutdown runtimeMasterThread. We need it to clean things up.
@@ -319,7 +359,9 @@ public final class RuntimeMaster {
       case MetricMessageReceived:
         final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
         metricList.forEach(metric ->
-            metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
+            metricMessageHandler.onMetricMessageReceived(
+                metric.getMetricType(), metric.getMetricId(),
+                metric.getMetricField(), metric.getMetricValue().toByteArray()));
         break;
       case ExecutorDataCollected:
         final String serializedData = message.getDataCollected().getData();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java
new file mode 100644
index 0000000..00a828e
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles total metric request.
+ */
+public final class AllMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpAllMetricToJson());
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java
new file mode 100644
index 0000000..9a38d89
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link JobMetric} metric request.
+ */
+public final class JobMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+          throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(JobMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java
new file mode 100644
index 0000000..cb5860a
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.StageMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link StageMetric} metric request.
+ */
+public final class StageMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(StageMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java
new file mode 100644
index 0000000..4e338fa
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.TaskMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link TaskMetric} metric request.
+ */
+public final class TaskMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(TaskMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
new file mode 100644
index 0000000..6af2695
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricBroadcaster;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for WebSocket metric request and response.
+ */
+public class WebSocketMetricAdapter extends WebSocketAdapter {
+  private static final Logger LOG = LoggerFactory.getLogger(WebSocketMetricAdapter.class.getName());
+  private Session session;
+
+  @Override
+  public final void onWebSocketConnect(final Session sess) {
+    this.session = sess;
+    MetricBroadcaster.getInstance().addSession(this.session);
+  }
+
+  @Override
+  public final void onWebSocketClose(final int statusCode, final String reason) {
+    if (statusCode != StatusCode.NORMAL) {
+      LOG.warn("WebSocket session closed abnormally: {} - {}.", statusCode, reason);
+    }
+    MetricBroadcaster.getInstance().removeSession(session);
+  }
+
+  @Override
+  public final void onWebSocketError(final Throwable throwable) {
+    MetricBroadcaster.getInstance().removeSession(session);
+  }
+
+  @Override
+  public final void onWebSocketText(final String text) {
+    try {
+      session.getRemote().sendString(text);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java
similarity index 57%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java
index d81db8c..e164501 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java
@@ -13,14 +13,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.metric.parameter;
+package edu.snu.nemo.runtime.master.servlet;
 
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 
 /**
- * Metric flushing period.
+ * Servlet which handles WebSocket HTTP request.
  */
-@NamedParameter(doc = "Metric flushing period (ms)", short_name = "mf_period", default_value = "5000")
-public final class MetricFlushPeriod implements Name<Long> {
+public class WebSocketMetricServlet extends WebSocketServlet {
+
+  @Override
+  public final void configure(final WebSocketServletFactory factory) {
+    // registers WebSocket adapter
+    factory.register(WebSocketMetricAdapter.class);
+  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java
new file mode 100644
index 0000000..54c2028
--- /dev/null
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link MetricStore}
+ */
+public final class MetricStoreTest {
+  @Test
+  public void testJson() throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+
+    metricStore.getOrCreateMetric(JobMetric.class, "testId");
+
+    final String json = metricStore.dumpMetricToJson(JobMetric.class);
+
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final TreeNode treeNode = objectMapper.readTree(json);
+
+    final TreeNode jobMetricNode = treeNode.get("JobMetric");
+    assertNotNull(jobMetricNode);
+
+    final TreeNode metricNode = jobMetricNode.get("testId");
+    assertNotNull(metricNode);
+
+    final TreeNode fieldNode = metricNode.get("id");
+    assertTrue(fieldNode.isValueNode());
+  }
+}


Mime
View raw message