nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-293] OOM exception in streaming (#164)
Date Mon, 03 Dec 2018 04:37:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d2c5c44  [NEMO-293] OOM exception in streaming (#164)
d2c5c44 is described below

commit d2c5c4476f6319cf9d91fbc55b0e6f00d4b229ef
Author: Taegeon Um <taegeonum@gmail.com>
AuthorDate: Mon Dec 3 13:37:55 2018 +0900

    [NEMO-293] OOM exception in streaming (#164)
    
    JIRA: [NEMO-293: OOM exception in streaming](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-293)
    
    **Major changes:**
    - Fix wrong byte encoding in `PipeOutputWriter`. This causes OOM because it sends unnecessary
bytes (count <= byte array size)
    - Add `writeElement` method to `ByteOutputContext` to emit data without copying byte array.
---
 .../executor/bytetransfer/ByteOutputContext.java   | 36 +++++++++++++++++++---
 .../datatransfer/DataFetcherOutputCollector.java   |  2 ++
 .../executor/datatransfer/PipeOutputWriter.java    | 15 +--------
 3 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 315760c..12761b2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -18,10 +18,14 @@
  */
 package org.apache.nemo.runtime.executor.bytetransfer;
 
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.FileArea;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +98,7 @@ public final class ByteOutputContext extends ByteTransferContext implements
Auto
       currentByteOutputStream.close();
     }
     channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId()))
-        .addListener(getChannelWriteListener());
+      .addListener(getChannelWriteListener());
     deregister();
     closed = true;
   }
@@ -150,7 +154,7 @@ public final class ByteOutputContext extends ByteTransferContext implements
Auto
      * @throws IOException when an exception has been set or this stream was closed
      */
     public ByteOutputStream writeSerializedPartition(final SerializedPartition serializedPartition)
-        throws IOException {
+      throws IOException {
       write(serializedPartition.getData(), 0, serializedPartition.getLength());
       return this;
     }
@@ -177,7 +181,7 @@ public final class ByteOutputContext extends ByteTransferContext implements
Auto
     }
 
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
       if (closed) {
         return;
       }
@@ -199,18 +203,40 @@ public final class ByteOutputContext extends ByteTransferContext implements
Auto
     }
 
     /**
+     * Write an element to the channel.
+     * @param element element
+     * @param serializer serializer
+     */
+    public void writeElement(final Object element,
+                             final Serializer serializer) {
+      final ByteBuf byteBuf = channel.alloc().ioBuffer();
+      final ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf);
+      try {
+        final OutputStream wrapped =
+          DataUtil.buildOutputStream(byteBufOutputStream, serializer.getEncodeStreamChainers());
+        final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
+        encoder.encode(element);
+        wrapped.close();
+
+        writeByteBuf(byteBuf);
+      } catch (final IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
      * Writes a data frame.
      * @param body        the body or {@code null}
      * @param length      the length of the body, in bytes
      * @throws IOException when an exception has been set or this stream was closed
      */
-    private synchronized void writeDataFrame(final Object body, final long length) throws
IOException {
+    private void writeDataFrame(final Object body, final long length) throws IOException
{
       ensureNoException();
       if (closed) {
         throw new IOException("Stream already closed.");
       }
       channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body,
length, newSubStream))
-          .addListener(getChannelWriteListener());
+        .addListener(getChannelWriteListener());
       newSubStream = false;
     }
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index d50ad82..9995e6a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -37,6 +37,8 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O>
{
   /**
    * It forwards output to the next operator.
    * @param nextOperatorVertex next operator to emit data and watermark
+   * @param edgeIndex edge index
+   * @param watermarkManager watermark manager
    */
   public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex,
                                     final int edgeIndex,
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index 03d7470..f937975 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -18,14 +18,11 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.DirectByteArrayOutputStream;
-import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
-import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
 import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
@@ -33,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -79,16 +75,7 @@ public final class PipeOutputWriter implements OutputWriter {
   private void writeData(final Object element, final List<ByteOutputContext> pipeList)
{
     pipeList.forEach(pipe -> {
       try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream())
{
-        // Serialize (Do not compress)
-        final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
-        final OutputStream wrapped =
-          DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
-        final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
-        encoder.encode(element);
-        wrapped.close();
-
-        // Write
-        pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+        pipeToWriteTo.writeElement(element, serializer);
       } catch (IOException e) {
         throw new RuntimeException(e); // For now we crash the executor on IOException
       }


Mime
View raw message