From commits-return-222-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Tue Jun 19 07:04:11 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 198B818067A for ; Tue, 19 Jun 2018 07:04:08 +0200 (CEST) Received: (qmail 19206 invoked by uid 500); 19 Jun 2018 05:04:08 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 19197 invoked by uid 99); 19 Jun 2018 05:04:07 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jun 2018 05:04:07 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 05578857C4; Tue, 19 Jun 2018 05:04:07 +0000 (UTC) Date: Tue, 19 Jun 2018 05:04:06 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch master updated: [NEMO-72] Instance-based Encoder/Decoder interface (#48) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152938464691.26973.10010445389031973983@gitbox.apache.org> From: jangho@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 39482ae358fbe343a51060d90e46e3bf7bbf0f15 X-Git-Newrev: 5dd93acb696496380cbb25a84121c4d1cab0b71a X-Git-Rev: 5dd93acb696496380cbb25a84121c4d1cab0b71a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jangho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/master by this push: new 5dd93ac [NEMO-72] Instance-based Encoder/Decoder interface (#48) 5dd93ac is described below commit 5dd93acb696496380cbb25a84121c4d1cab0b71a Author: Sanha Lee AuthorDate: Tue Jun 19 14:04:04 2018 +0900 [NEMO-72] Instance-based Encoder/Decoder interface (#48) JIRA: [NEMO-72: Instance-based Encoder/Decoder interface](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-72) **Major changes:** - Modify the interface of `Coder` from static fashion to instance-based fashion (to avoid to generate instance-based coder such as Spark serializer for every encoding). - Split `Coder` into `Encoder` and `Decoder` (to support some use cases to separate the encoding and decoding like in [#32](https://issues.apache.org/jira/browse/NEMO-32)). - Split `StreamChainer` into `EncodeStreamChainer` and `DecodeStreamChainer`. **Minor changes to note:** - Split `CoderProperty` into `EncoderProperty` and `DecoderProperty` - Split `CompressionProperty` into `CompressionProperty` and `DecompressionProperty` **Tests for the changes:** - Existing unit tests for `Coder`, `CoderProperty`, `Compression`, `CompressionProperty`, and other data-plain side tests cover this change. - Existing integration tests also cover this change. **Other comments:** - N/A. resolves [NEMO-72](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-72) --- bin/json2dot.py | 15 ++- .../java/edu/snu/nemo/common/coder/BytesCoder.java | 53 -------- .../main/java/edu/snu/nemo/common/coder/Coder.java | 77 ----------- .../edu/snu/nemo/common/coder/DecoderFactory.java | 91 +++++++++++++ .../edu/snu/nemo/common/coder/EncoderFactory.java | 92 +++++++++++++ .../java/edu/snu/nemo/common/coder/IntCoder.java | 52 -------- .../snu/nemo/common/coder/IntDecoderFactory.java | 70 ++++++++++ .../snu/nemo/common/coder/IntEncoderFactory.java | 70 ++++++++++ .../java/edu/snu/nemo/common/coder/PairCoder.java | 83 ------------ .../snu/nemo/common/coder/PairDecoderFactory.java | 95 +++++++++++++ .../snu/nemo/common/coder/PairEncoderFactory.java | 96 ++++++++++++++ .../executionproperty/CompressionProperty.java | 2 + .../{CoderProperty.java => DecoderProperty.java} | 12 +- ...derProperty.java => DecompressionProperty.java} | 12 +- .../{CoderProperty.java => EncoderProperty.java} | 12 +- .../edu/snu/nemo/common/ir/LoopVertexTest.java | 1 - .../ExecutionPropertyMapTest.java | 17 +-- .../frontend/beam/NemoPipelineVisitor.java | 84 +++++++----- .../compiler/frontend/beam/coder/BeamCoder.java | 70 ---------- .../frontend/beam/coder/BeamDecoderFactory.java | 147 +++++++++++++++++++++ .../frontend/beam/coder/BeamEncoderFactory.java | 112 ++++++++++++++++ .../frontend/beam/coder/FloatArrayCoder.java | 2 +- .../frontend/beam/coder/IntArrayCoder.java | 2 +- .../compiler/frontend/spark/coder/SparkCoder.java | 51 ------- .../frontend/spark/coder/SparkDecoderFactory.java | 71 ++++++++++ .../frontend/spark/coder/SparkEncoderFactory.java | 71 ++++++++++ .../frontend/spark/core/SparkFrontendUtils.java | 15 ++- .../frontend/spark/core/rdd/PairRDDFunctions.scala | 9 +- .../compiler/frontend/spark/core/rdd/RDD.scala | 22 +-- .../compiletime/annotating/CompressionPass.java | 1 + ...CompressionPass.java => DecompressionPass.java} | 25 ++-- ...eCoderPass.java => DefaultEdgeDecoderPass.java} | 19 +-- ...eCoderPass.java => DefaultEdgeEncoderPass.java} | 19 +-- .../composite/PrimitiveCompositePass.java | 6 +- .../CommonSubexpressionEliminationPass.java | 16 ++- .../reshaping/DataSkewReshapingPass.java | 6 +- .../compiletime/reshaping/LoopOptimizations.java | 6 +- .../reshaping/SailfishRelayReshapingPass.java | 6 +- .../spark/sql/JavaUserDefinedTypedAggregation.java | 4 +- .../edu/snu/nemo/runtime/executor/Executor.java | 22 ++- .../snu/nemo/runtime/executor/data/DataUtil.java | 103 +++++++++------ .../runtime/executor/data/SerializerManager.java | 54 +++++--- .../data/partition/SerializedPartition.java | 12 +- .../streamchainer/CompressionStreamChainer.java | 19 +-- ...StreamChainer.java => DecodeStreamChainer.java} | 17 +-- ...hainer.java => DecompressionStreamChainer.java} | 21 +-- ...StreamChainer.java => EncodeStreamChainer.java} | 15 +-- .../executor/data/streamchainer/Serializer.java | 64 +++++---- .../nemo/runtime/executor/data/BlockStoreTest.java | 13 +- .../executor/datatransfer/DataTransferTest.java | 53 ++++---- .../runtime/executor/task/TaskExecutorTest.java | 1 - .../annotating/DefaultEdgeCoderPassTest.java | 51 ++++--- .../compiletime/reshaping/LoopFusionPassTest.java | 9 +- .../reshaping/LoopInvariantCodeMotionPassTest.java | 6 +- .../optimizer/policy/PolicyBuilderTest.java | 6 +- 55 files changed, 1334 insertions(+), 746 deletions(-) diff --git a/bin/json2dot.py b/bin/json2dot.py index 20fa177..6f2d339 100755 --- a/bin/json2dot.py +++ b/bin/json2dot.py @@ -305,7 +305,8 @@ class IREdge: self.dst = dst self.id = properties['id'] self.executionProperties = properties['executionProperties'] - self.coder = self.executionProperties['Coder'] + self.encoderFactory = self.executionProperties['Encoder'] + self.decoderFactory = self.executionProperties['Decoder'] @property def dot(self): src = self.src @@ -318,7 +319,7 @@ class IREdge: dst = dst.internalDstFor(self.id) except: pass - label = '{}
{}
{}'.format(self.id, edgePropertiesString(self.executionProperties), self.coder) + label = '{}
{}
{}
{}
'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory) return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx, dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label) @@ -328,10 +329,11 @@ class StageEdge: self.dst = dst.internalDAG.vertices[properties['dstVertex']] self.runtimeEdgeId = properties['runtimeEdgeId'] self.executionProperties = properties['executionProperties'] - self.coder = self.executionProperties['Coder'] + self.encoderFactory = self.executionProperties['Encoder'] + self.decoderFactory = self.executionProperties['Decoder'] @property def dot(self): - label = '{}
{}
{}'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder) + label = '{}
{}
{}
{}
'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory) return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx, self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label) @@ -341,10 +343,11 @@ class RuntimeEdge: self.dst = dst self.runtimeEdgeId = properties['runtimeEdgeId'] self.executionProperties = properties['executionProperties'] - self.coder = self.executionProperties['Coder'] + self.encoderFactory = self.executionProperties['Encoder'] + self.decoderFactory = self.executionProperties['Decoder'] @property def dot(self): - label = '{}
{}
{}'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder) + label = '{}
{}
{}
{}
'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory) return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx, self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label) diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java deleted file mode 100644 index 3e467f5..0000000 --- a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.common.coder; - -import java.io.*; - -/** - * A {@link Coder} which is used for an array of bytes. - */ -public final class BytesCoder implements Coder { - - /** - * Constructor. - */ - public BytesCoder() { - } - - @Override - public void encode(final byte[] value, final OutputStream outStream) throws IOException { - try (final DataOutputStream dataOutputStream = new DataOutputStream(outStream)) { - dataOutputStream.writeInt(value.length); // Write the size of this byte array. - dataOutputStream.write(value); - } - } - - @Override - public byte[] decode(final InputStream inStream) throws IOException { - // If the inStream is closed well in upper level, it is okay to not close this stream - // because the DataInputStream itself will not contain any extra information. - // (when we close this stream, the inStream will be closed together.) - final DataInputStream dataInputStream = new DataInputStream(inStream); - final int bytesToRead = dataInputStream.readInt(); - final byte[] bytes = new byte[bytesToRead]; // Read the size of this byte array. - final int readBytes = dataInputStream.read(bytes); - if (bytesToRead != readBytes) { - throw new IOException("Have to read " + bytesToRead + " but read only " + readBytes + " bytes."); - } - return bytes; - } -} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java deleted file mode 100644 index 49c56d1..0000000 --- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.common.coder; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -/** - * A coder object encodes or decodes values of type {@code T} into byte streams. - * - * @param element type. - */ -public interface Coder extends Serializable { - /** - * Encodes the given value onto the specified output stream. - * It have to be able to encode the given stream consequently by calling this method repeatedly. - * Because the user can want to keep a single output stream and continuously concatenate elements, - * the output stream should not be closed. - * - * @param element the element to be encoded - * @param outStream the stream on which encoded bytes are written - * @throws IOException if fail to encode - */ - void encode(T element, OutputStream outStream) throws IOException; - - /** - * Decodes the a value from the given input stream. - * It have to be able to decode the given stream consequently by calling this method repeatedly. - * Because there are many elements in the input stream, the stream should not be closed. - * - * @param inStream the stream from which bytes are read - * @return the decoded element - * @throws IOException if fail to decode - */ - T decode(InputStream inStream) throws IOException; - - /** - * Dummy coder. - */ - Coder DUMMY_CODER = new DummyCoder(); - - /** - * Dummy coder implementation which is not supposed to be used. - */ - final class DummyCoder implements Coder { - - @Override - public void encode(final Object value, final OutputStream outStream) { - throw new RuntimeException("DummyCoder is not supposed to be used."); - } - - @Override - public Object decode(final InputStream inStream) { - throw new RuntimeException("DummyCoder is not supposed to be used."); - } - - @Override - public String toString() { - return "DUMMY_CODER"; - } - } -} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java new file mode 100644 index 0000000..a93d843 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +/** + * A decoder factory object which generates decoders that decode values of type {@code T} into byte streams. + * To avoid to generate instance-based coder such as Spark serializer for every decoding, + * user need to instantiate a decoder instance and use it. + * + * @param element type. + */ +public interface DecoderFactory extends Serializable { + + /** + * Get a decoder instance. + * + * @param inputStream the input stream to decode. + * @return the decoder instance. + * @throws IOException if fail to get the instance. + */ + Decoder create(InputStream inputStream) throws IOException; + + /** + * Interface of Decoder. + * + * @param element type. + */ + interface Decoder extends Serializable { + + /** + * Decodes the a value from the given input stream. + * It have to be able to decode the given stream consequently by calling this method repeatedly. + * Because there are many elements in the input stream, the stream should not be closed. + * + * @return the decoded element + * @throws IOException if fail to decode + */ + T decode() throws IOException; + } + + /** + * Dummy coder factory. + */ + DecoderFactory DUMMY_DECODER_FACTORY = new DummyDecoderFactory(); + + /** + * Dummy coder factory implementation which is not supposed to be used. + */ + final class DummyDecoderFactory implements DecoderFactory { + + private final Decoder dummyDecoder = new DummyDecoder(); + + /** + * DummyDecoder. + */ + private final class DummyDecoder implements Decoder { + + @Override + public Object decode() { + throw new RuntimeException("DummyDecoder is not supposed to be used."); + } + } + + @Override + public Decoder create(final InputStream inputStream) { + return dummyDecoder; + } + + @Override + public String toString() { + return "DUMMY_DECODER_FACTORY"; + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java new file mode 100644 index 0000000..d63fafb --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * A encoder factory object which generates encoders that encode values of type {@code T} into byte streams. + * To avoid to generate instance-based coder such as Spark serializer for every encoding, + * user need to explicitly instantiate an encoder instance and use it. + * + * @param element type. + */ +public interface EncoderFactory extends Serializable { + + /** + * Get an encoder instance. + * + * @param outputStream the stream on which encoded bytes are written + * @return the encoder instance. + * @throws IOException if fail to get the instance. + */ + Encoder create(OutputStream outputStream) throws IOException; + + /** + * Interface of Encoder. + * + * @param element type. + */ + interface Encoder extends Serializable { + + /** + * Encodes the given value onto the specified output stream. + * It have to be able to encode the given stream consequently by calling this method repeatedly. + * Because the user can want to keep a single output stream and continuously concatenate elements, + * the output stream should not be closed. + * + * @param element the element to be encoded + * @throws IOException if fail to encode + */ + void encode(T element) throws IOException; + } + + /** + * Dummy encoder factory. + */ + EncoderFactory DUMMY_ENCODER_FACTORY = new DummyEncoderFactory(); + + /** + * Dummy encoder factory implementation which is not supposed to be used. + */ + final class DummyEncoderFactory implements EncoderFactory { + + private final Encoder dummyEncoder = new DummyEncoder(); + + /** + * DummyEncoder. + */ + private final class DummyEncoder implements Encoder { + + @Override + public void encode(final Object element) { + throw new RuntimeException("DummyEncoder is not supposed to be used."); + } + } + + @Override + public Encoder create(final OutputStream outputStream) { + return dummyEncoder; + } + + @Override + public String toString() { + return "DUMMY_ENCODER_FACTORY"; + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java deleted file mode 100644 index face994..0000000 --- a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.common.coder; - -import java.io.*; - -/** - * A {@link Coder} which is used for an integer. - */ -public final class IntCoder implements Coder { - - /** - * A private constructor. - */ - private IntCoder() { - } - - /** - * Static initializer of the coder. - */ - public static IntCoder of() { - return new IntCoder(); - } - - @Override - public void encode(final Integer value, final OutputStream outStream) throws IOException { - final DataOutputStream dataOutputStream = new DataOutputStream(outStream); - dataOutputStream.writeInt(value); - } - - @Override - public Integer decode(final InputStream inStream) throws IOException { - // If the inStream is closed well in upper level, it is okay to not close this stream - // because the DataInputStream itself will not contain any extra information. - // (when we close this stream, the inStream will be closed together.) - final DataInputStream dataInputStream = new DataInputStream(inStream); - return dataInputStream.readInt(); - } -} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java new file mode 100644 index 0000000..d79ec35 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.*; + +/** + * A {@link DecoderFactory} which is used for an integer. + */ +public final class IntDecoderFactory implements DecoderFactory { + + private static final IntDecoderFactory INT_DECODER_FACTORY = new IntDecoderFactory(); + + /** + * A private constructor. + */ + private IntDecoderFactory() { + // do nothing. + } + + /** + * Static initializer of the coder. + */ + public static IntDecoderFactory of() { + return INT_DECODER_FACTORY; + } + + @Override + public Decoder create(final InputStream inputStream) { + return new IntDecoder(inputStream); + } + + /** + * IntDecoder. + */ + private final class IntDecoder implements Decoder { + + private final DataInputStream inputStream; + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + */ + private IntDecoder(final InputStream inputStream) { + // If the inputStream is closed well in upper level, it is okay to not close this stream + // because the DataInputStream itself will not contain any extra information. + // (when we close this stream, the input will be closed together.) + this.inputStream = new DataInputStream(inputStream); + } + + @Override + public Integer decode() throws IOException { + return inputStream.readInt(); + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java new file mode 100644 index 0000000..0a40de5 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.*; + +/** + * A {@link EncoderFactory} which is used for an integer. + */ +public final class IntEncoderFactory implements EncoderFactory { + + private static final IntEncoderFactory INT_ENCODER_FACTORY = new IntEncoderFactory(); + + /** + * A private constructor. + */ + private IntEncoderFactory() { + // do nothing. + } + + /** + * Static initializer of the coder. + */ + public static IntEncoderFactory of() { + return INT_ENCODER_FACTORY; + } + + @Override + public Encoder create(final OutputStream outputStream) { + return new IntEncoder(outputStream); + } + + /** + * IntEncoder. + */ + private final class IntEncoder implements Encoder { + + private final DataOutputStream outputStream; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + */ + private IntEncoder(final OutputStream outputStream) { + // If the outputStream is closed well in upper level, it is okay to not close this stream + // because the DataOutputStream itself will not contain any extra information. + // (when we close this stream, the output will be closed together.) + this.outputStream = new DataOutputStream(outputStream); + } + + @Override + public void encode(final Integer value) throws IOException { + outputStream.writeInt(value); + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java deleted file mode 100644 index 3ad3552..0000000 --- a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.common.coder; - -import edu.snu.nemo.common.Pair; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM. - * @param type for the left coder. - * @param type for the right coder. - */ -public final class PairCoder implements Coder> { - private final Coder leftCoder; - private final Coder rightCoder; - - /** - * Private constructor of PairCoder class. - * @param leftCoder coder for right element. - * @param rightCoder coder for right element. - */ - private PairCoder(final Coder leftCoder, final Coder rightCoder) { - this.leftCoder = leftCoder; - this.rightCoder = rightCoder; - } - - /** - * static initializer of the class. - * @param leftCoder left coder. - * @param rightCoder right coder. - * @param type of the left element. - * @param type of the right element. - * @return the new PairCoder. - */ - public static PairCoder of(final Coder leftCoder, final Coder rightCoder) { - return new PairCoder<>(leftCoder, rightCoder); - } - - /** - * @return the left coder. - */ - Coder getLeftCoder() { - return leftCoder; - } - - /** - * @return the right coder. - */ - Coder getRightCoder() { - return rightCoder; - } - - @Override - public void encode(final Pair pair, final OutputStream outStream) throws IOException { - if (pair == null) { - throw new IOException("cannot encode a null pair"); - } - leftCoder.encode(pair.left(), outStream); - rightCoder.encode(pair.right(), outStream); - } - - @Override - public Pair decode(final InputStream inStream) throws IOException { - final A key = leftCoder.decode(inStream); - final B value = rightCoder.decode(inStream); - return Pair.of(key, value); - } -} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java new file mode 100644 index 0000000..9cd0fb1 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import edu.snu.nemo.common.Pair; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An DecoderFactory for {@link Pair}. Reference: KvCoder in BEAM. + * @param type for the left coder. + * @param type for the right coder. + */ +public final class PairDecoderFactory implements DecoderFactory> { + private final DecoderFactory leftDecoderFactory; + private final DecoderFactory rightDecoderFactory; + + /** + * Private constructor of PairDecoderFactory class. + * + * @param leftDecoderFactory coder for right element. + * @param rightDecoderFactory coder for right element. + */ + private PairDecoderFactory(final DecoderFactory leftDecoderFactory, + final DecoderFactory rightDecoderFactory) { + this.leftDecoderFactory = leftDecoderFactory; + this.rightDecoderFactory = rightDecoderFactory; + } + + /** + * static initializer of the class. + * + * @param leftDecoderFactory left coder. + * @param rightDecoderFactory right coder. + * @param type of the left element. + * @param type of the right element. + * @return the new PairDecoderFactory. + */ + public static PairDecoderFactory of(final DecoderFactory leftDecoderFactory, + final DecoderFactory rightDecoderFactory) { + return new PairDecoderFactory<>(leftDecoderFactory, rightDecoderFactory); + } + + @Override + public Decoder> create(final InputStream inputStream) throws IOException { + return new PairDecoder<>(inputStream, leftDecoderFactory, rightDecoderFactory); + } + + /** + * PairDecoder. + * @param type for the left coder. + * @param type for the right coder. + */ + private final class PairDecoder implements Decoder> { + + private final Decoder leftDecoder; + private final Decoder rightDecoder; + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + * @param leftDecoderFactory the actual decoder to use for left elements. + * @param rightDecoderFactory the actual decoder to use for right elements. + * @throws IOException if fail to instantiate coders. + */ + private PairDecoder(final InputStream inputStream, + final DecoderFactory leftDecoderFactory, + final DecoderFactory rightDecoderFactory) throws IOException { + this.leftDecoder = leftDecoderFactory.create(inputStream); + this.rightDecoder = rightDecoderFactory.create(inputStream); + } + + @Override + public Pair decode() throws IOException { + final T1 key = leftDecoder.decode(); + final T2 value = rightDecoder.decode(); + return Pair.of(key, value); + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java new file mode 100644 index 0000000..0837b57 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import edu.snu.nemo.common.Pair; +import java.io.IOException; +import java.io.OutputStream; + +/** + * An EncoderFactory for {@link Pair}. Reference: KvCoder in BEAM. + * @param type for the left coder. + * @param type for the right coder. + */ +public final class PairEncoderFactory implements EncoderFactory> { + private final EncoderFactory leftEncoderFactory; + private final EncoderFactory rightEncoderFactory; + + /** + * Private constructor of PairEncoderFactory class. + * + * @param leftEncoderFactory coder for right element. + * @param rightEncoderFactory coder for right element. + */ + private PairEncoderFactory(final EncoderFactory leftEncoderFactory, + final EncoderFactory rightEncoderFactory) { + this.leftEncoderFactory = leftEncoderFactory; + this.rightEncoderFactory = rightEncoderFactory; + } + + /** + * static initializer of the class. + * + * @param leftEncoderFactory left coder. + * @param rightEncoderFactory right coder. + * @param type of the left element. + * @param type of the right element. + * @return the new PairEncoderFactory. + */ + public static PairEncoderFactory of(final EncoderFactory leftEncoderFactory, + final EncoderFactory rightEncoderFactory) { + return new PairEncoderFactory<>(leftEncoderFactory, rightEncoderFactory); + } + + @Override + public Encoder> create(final OutputStream outputStream) throws IOException { + return new PairEncoder<>(outputStream, leftEncoderFactory, rightEncoderFactory); + } + + /** + * PairEncoder. + * @param type for the left coder. + * @param type for the right coder. + */ + private final class PairEncoder implements Encoder> { + + private final Encoder leftEncoder; + private final Encoder rightEncoder; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + * @param leftEncoderFactory the actual encoder to use for left elements. + * @param rightEncoderFactory the actual encoder to use for right elements. + * @throws IOException if fail to instantiate coders. + */ + private PairEncoder(final OutputStream outputStream, + final EncoderFactory leftEncoderFactory, + final EncoderFactory rightEncoderFactory) throws IOException { + this.leftEncoder = leftEncoderFactory.create(outputStream); + this.rightEncoder = rightEncoderFactory.create(outputStream); + } + + @Override + public void encode(final Pair pair) throws IOException { + if (pair == null) { + throw new IOException("cannot encode a null pair"); + } + leftEncoder.encode(pair.left()); + rightEncoder.encode(pair.right()); + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java index 6402319..441949e 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java @@ -23,6 +23,7 @@ import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; public final class CompressionProperty extends EdgeExecutionProperty { /** * Constructor. + * * @param value value of the execution property. */ private CompressionProperty(final Value value) { @@ -31,6 +32,7 @@ public final class CompressionProperty extends EdgeExecutionProperty { +public final class DecoderProperty extends EdgeExecutionProperty { /** * Constructor. * * @param value value of the execution property. */ - private CoderProperty(final Coder value) { + private DecoderProperty(final DecoderFactory value) { super(value); } @@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty { * @param value value of the new execution property. * @return the newly created execution property. */ - public static CoderProperty of(final Coder value) { - return new CoderProperty(value); + public static DecoderProperty of(final DecoderFactory value) { + return new DecoderProperty(value); } } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java similarity index 72% copy from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java copy to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java index aae1c6e..bfd09e0 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java @@ -15,19 +15,19 @@ */ package edu.snu.nemo.common.ir.edge.executionproperty; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; /** - * Coder ExecutionProperty. + * Decompression ExecutionProperty. + * It shares the value with {@link CompressionProperty}. */ -public final class CoderProperty extends EdgeExecutionProperty { +public final class DecompressionProperty extends EdgeExecutionProperty { /** * Constructor. * * @param value value of the execution property. */ - private CoderProperty(final Coder value) { + private DecompressionProperty(final CompressionProperty.Value value) { super(value); } @@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty { * @param value value of the new execution property. * @return the newly created execution property. */ - public static CoderProperty of(final Coder value) { - return new CoderProperty(value); + public static DecompressionProperty of(final CompressionProperty.Value value) { + return new DecompressionProperty(value); } } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java similarity index 75% rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java index aae1c6e..f214f17 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java @@ -15,19 +15,19 @@ */ package edu.snu.nemo.common.ir.edge.executionproperty; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; /** - * Coder ExecutionProperty. + * EncoderFactory ExecutionProperty. */ -public final class CoderProperty extends EdgeExecutionProperty { +public final class EncoderProperty extends EdgeExecutionProperty { /** * Constructor. * * @param value value of the execution property. */ - private CoderProperty(final Coder value) { + private EncoderProperty(final EncoderFactory value) { super(value); } @@ -37,7 +37,7 @@ public final class CoderProperty extends EdgeExecutionProperty { * @param value value of the new execution property. * @return the newly created execution property. */ - public static CoderProperty of(final Coder value) { - return new CoderProperty(value); + public static EncoderProperty of(final EncoderFactory value) { + return new EncoderProperty(value); } } diff --git a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java index 4650aae..558a9f7 100644 --- a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java +++ b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java @@ -15,7 +15,6 @@ */ package edu.snu.nemo.common.ir; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java index f7eaad7..1ca18ff 100644 --- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java +++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java @@ -15,15 +15,10 @@ */ package edu.snu.nemo.common.ir.executionproperty; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty; -import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; -import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; -import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.*; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; @@ -66,8 +61,10 @@ public class ExecutionPropertyMapTest { assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get()); edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull)); assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get()); - edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER)); - assertEquals(Coder.DUMMY_CODER, edgeMap.get(CoderProperty.class).get()); + edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY)); + assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, edgeMap.get(EncoderProperty.class).get()); + edgeMap.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY)); + assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, edgeMap.get(DecoderProperty.class).get()); edgeMap.remove(DataFlowModelProperty.class); assertFalse(edgeMap.get(DataFlowModelProperty.class).isPresent()); diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java index bf3253b..b85057b 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java @@ -15,9 +15,12 @@ */ package edu.snu.nemo.compiler.frontend.beam; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.Pair; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder; +import edu.snu.nemo.compiler.frontend.beam.coder.BeamDecoderFactory; +import edu.snu.nemo.compiler.frontend.beam.coder.BeamEncoderFactory; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; @@ -55,10 +58,11 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults private final PipelineOptions options; // loopVertexStack keeps track of where the beam program is: whether it is inside a composite transform or it is not. private final Stack loopVertexStack; - private final Map pValueToCoder; + private final Map> pValueToCoder; /** * Constructor of the BEAM Visitor. + * * @param builder DAGBuilder to build the DAG with. * @param options Pipeline options. */ @@ -96,10 +100,11 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults throw new UnsupportedOperationException(beamNode.toString()); } - final IRVertex irVertex = convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options, - loopVertexStack); + final IRVertex irVertex = + convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options, loopVertexStack); beamNode.getOutputs().values().stream().filter(v -> v instanceof PCollection).map(v -> (PCollection) v) - .forEach(output -> pValueToCoder.put(output, new BeamCoder(output.getCoder()))); + .forEach(output -> pValueToCoder.put(output, + Pair.of(new BeamEncoderFactory(output.getCoder()), new BeamDecoderFactory(output.getCoder())))); beamNode.getOutputs().values().forEach(output -> pValueToVertex.put(output, irVertex)); @@ -107,7 +112,9 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults .forEach(pValue -> { final IRVertex src = pValueToVertex.get(pValue); final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex); - edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue))); + final Pair coderPair = pValueToCoder.get(pValue); + edge.setProperty(EncoderProperty.of(coderPair.left())); + edge.setProperty(DecoderProperty.of(coderPair.right())); edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor())); this.builder.connectVertices(edge); }); @@ -115,22 +122,24 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults /** * Convert Beam node to IR vertex. - * @param beamNode input beam node. - * @param builder the DAG builder to add the vertex to. - * @param pValueToVertex PValue to Vertex map. - * @param pValueToCoder PValue to Coder map. - * @param options pipeline options. + * + * @param beamNode input beam node. + * @param builder the DAG builder to add the vertex to. + * @param pValueToVertex PValue to Vertex map. + * @param pValueToCoder PValue to EncoderFactory and DecoderFactory map. + * @param options pipeline options. * @param loopVertexStack Stack to get the current loop vertex that the operator vertex will be assigned to. - * @param input type. - * @param output type. + * @param input type. + * @param output type. * @return newly created vertex. */ - private static IRVertex convertToVertex(final TransformHierarchy.Node beamNode, - final DAGBuilder builder, - final Map pValueToVertex, - final Map pValueToCoder, - final PipelineOptions options, - final Stack loopVertexStack) { + private static IRVertex + convertToVertex(final TransformHierarchy.Node beamNode, + final DAGBuilder builder, + final Map pValueToVertex, + final Map> pValueToCoder, + final PipelineOptions options, + final Stack loopVertexStack) { final PTransform beamTransform = beamNode.getTransform(); final IRVertex irVertex; if (beamTransform instanceof Read.Bounded) { @@ -147,13 +156,14 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults pValueToVertex.put(view.getView(), irVertex); builder.addVertex(irVertex, loopVertexStack); // Coders for outgoing edges in CreateViewTransform. - // Since outgoing PValues for CreateViewTransform is PCollectionView, we cannot use PCollection::getCoder to - // obtain coders. + // Since outgoing PValues for CreateViewTransform is PCollectionView, + // we cannot use PCollection::getEncoderFactory to obtain coders. final Coder beamInputCoder = beamNode.getInputs().values().stream() .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst() .orElseThrow(() -> new RuntimeException("No inputs provided to " + beamNode.getFullName())).getCoder(); beamNode.getOutputs().values().stream() - .forEach(output -> pValueToCoder.put(output, getCoderForView(view.getView().getViewFn(), beamInputCoder))); + .forEach(output -> + pValueToCoder.put(output, getCoderPairForView(view.getView().getViewFn(), beamInputCoder))); } else if (beamTransform instanceof Window) { final Window window = (Window) beamTransform; final WindowTransform transform = new WindowTransform(window.getWindowFn()); @@ -187,35 +197,40 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults /** * Connect side inputs to the vertex. - * @param builder the DAG builder to add the vertex to. - * @param sideInputs side inputs. + * + * @param builder the DAG builder to add the vertex to. + * @param sideInputs side inputs. * @param pValueToVertex PValue to Vertex map. - * @param pValueToCoder PValue to Coder map. - * @param irVertex wrapper for a user operation in the IR. (Where the side input is headed to) + * @param pValueToCoder PValue to Encoder/Decoder factory map. + * @param irVertex wrapper for a user operation in the IR. (Where the side input is headed to) */ private static void connectSideInputs(final DAGBuilder builder, final List> sideInputs, final Map pValueToVertex, - final Map pValueToCoder, + final Map> pValueToCoder, final IRVertex irVertex) { sideInputs.stream().filter(pValueToVertex::containsKey) .forEach(pValue -> { final IRVertex src = pValueToVertex.get(pValue); final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, true); - edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue))); + final Pair coder = pValueToCoder.get(pValue); + edge.setProperty(EncoderProperty.of(coder.left())); + edge.setProperty(DecoderProperty.of(coder.right())); edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor())); builder.connectVertices(edge); }); } /** - * Get appropriate coder for {@link PCollectionView}. - * @param viewFn {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform + * Get appropriate encoder and decoder pair for {@link PCollectionView}. + * + * @param viewFn {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform * @param beamInputCoder Beam {@link Coder} for input value to {@link View.CreatePCollectionView} - * @return appropriate {@link BeamCoder} + * @return appropriate pair of {@link BeamEncoderFactory} and {@link BeamDecoderFactory} */ - private static BeamCoder getCoderForView(final ViewFn viewFn, final Coder beamInputCoder) { + private static Pair getCoderPairForView(final ViewFn viewFn, + final Coder beamInputCoder) { final Coder beamOutputCoder; if (viewFn instanceof PCollectionViews.IterableViewFn) { beamOutputCoder = IterableCoder.of(beamInputCoder); @@ -232,11 +247,12 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults } else { throw new UnsupportedOperationException("Unsupported viewFn: " + viewFn.getClass()); } - return new BeamCoder(beamOutputCoder); + return Pair.of(new BeamEncoderFactory(beamOutputCoder), new BeamDecoderFactory(beamOutputCoder)); } /** * Get the edge type for the src, dst vertex. + * * @param src source vertex. * @param dst destination vertex. * @return the appropriate edge type. diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java deleted file mode 100644 index 56f6714..0000000 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.compiler.frontend.beam.coder; - -import edu.snu.nemo.common.coder.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VoidCoder; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * {@link Coder} from {@link org.apache.beam.sdk.coders.Coder}. - * @param element type. - */ -public final class BeamCoder implements Coder { - private final org.apache.beam.sdk.coders.Coder beamCoder; - - /** - * Constructor of BeamCoder. - * @param beamCoder actual Beam coder to use. - */ - public BeamCoder(final org.apache.beam.sdk.coders.Coder beamCoder) { - this.beamCoder = beamCoder; - } - - @Override - public void encode(final T value, final OutputStream outStream) throws IOException { - if (beamCoder instanceof VoidCoder) { - outStream.write(0); - return; - } - try { - beamCoder.encode(value, outStream); - } catch (final CoderException e) { - throw new IOException(e); - } - } - - @Override - public T decode(final InputStream inStream) throws IOException { - if (beamCoder instanceof VoidCoder && inStream.read() == -1) { - throw new IOException("End of stream reached"); - } - try { - return beamCoder.decode(inStream); - } catch (final CoderException e) { - throw new IOException(e); - } - } - - @Override - public String toString() { - return beamCoder.toString(); - } -} diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java new file mode 100644 index 0000000..7ebea38 --- /dev/null +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.frontend.beam.coder; + +import edu.snu.nemo.common.coder.DecoderFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VoidCoder; + +import java.io.IOException; +import java.io.InputStream; + +/** + * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}. + * @param the type of element to encode. + */ +public final class BeamDecoderFactory implements DecoderFactory { + private final Coder beamCoder; + + /** + * Constructor of BeamDecoderFactory. + * + * @param beamCoder actual Beam coder to use. + */ + public BeamDecoderFactory(final Coder beamCoder) { + this.beamCoder = beamCoder; + } + + @Override + public Decoder create(final InputStream inputStream) { + if (beamCoder instanceof VoidCoder) { + return new BeamVoidDecoder<>(inputStream, beamCoder); + } else { + return new BeamDecoder<>(inputStream, beamCoder); + } + } + + /** + * Abstract class for Beam Decoder. + * @param the type of element to decode. + */ + private abstract class BeamAbstractDecoder implements Decoder { + + private final Coder beamCoder; + private final InputStream inputStream; + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + * @param beamCoder the actual beam coder to use. + */ + protected BeamAbstractDecoder(final InputStream inputStream, + final Coder beamCoder) { + this.inputStream = inputStream; + this.beamCoder = beamCoder; + } + + /** + * Decode the actual data internally. + * + * @return the decoded data. + * @throws IOException if fail to decode. + */ + protected T2 decodeInternal() throws IOException { + try { + return beamCoder.decode(inputStream); + } catch (final CoderException e) { + throw new IOException(e); + } + } + + /** + * @return the input stream. + */ + protected InputStream getInputStream() { + return inputStream; + } + } + + /** + * Beam Decoder for non void objects. + * @param the type of element to decode. + */ + private final class BeamDecoder extends BeamAbstractDecoder { + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + * @param beamCoder the actual beam coder to use. + */ + private BeamDecoder(final InputStream inputStream, + final Coder beamCoder) { + super(inputStream, beamCoder); + } + + @Override + public T2 decode() throws IOException { + return decodeInternal(); + } + } + + /** + * Beam Decoder for {@link VoidCoder}. + * @param the type of element to decode. + */ + private final class BeamVoidDecoder extends BeamAbstractDecoder { + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + * @param beamCoder the actual beam coder to use. + */ + private BeamVoidDecoder(final InputStream inputStream, + final Coder beamCoder) { + super(inputStream, beamCoder); + } + + @Override + public T2 decode() throws IOException { + if (getInputStream().read() == -1) { + throw new IOException("End of stream reached"); + } + return decodeInternal(); + } + } + + @Override + public String toString() { + return beamCoder.toString(); + } +} diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java new file mode 100644 index 0000000..f67b96c --- /dev/null +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.frontend.beam.coder; + +import edu.snu.nemo.common.coder.EncoderFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VoidCoder; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * {@link EncoderFactory} from {@link Coder}. + * @param the type of element to encode. + */ +public final class BeamEncoderFactory implements EncoderFactory { + + private final Coder beamCoder; + + /** + * Constructor of BeamEncoderFactory. + * + * @param beamCoder actual Beam coder to use. + */ + public BeamEncoderFactory(final Coder beamCoder) { + this.beamCoder = beamCoder; + } + + @Override + public Encoder create(final OutputStream outputStream) { + if (beamCoder instanceof VoidCoder) { + return new BeamVoidEncoder<>(outputStream); + } else { + return new BeamEncoder<>(outputStream, beamCoder); + } + } + + /** + * Beam Encoder for non void objects. + * + * @param the type of element to decode. + */ + private final class BeamEncoder implements Encoder { + + private final Coder beamCoder; + private final OutputStream outputStream; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + * @param beamCoder the actual beam coder to use. + */ + private BeamEncoder(final OutputStream outputStream, + final Coder beamCoder) { + this.outputStream = outputStream; + this.beamCoder = beamCoder; + } + + @Override + public void encode(final T2 element) throws IOException { + try { + beamCoder.encode(element, outputStream); + } catch (final CoderException e) { + throw new IOException(e); + } + } + } + + /** + * Beam Decoder for {@link VoidCoder}. + * + * @param the type of element to decode. + */ + private final class BeamVoidEncoder implements Encoder { + + private final OutputStream outputStream; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + */ + private BeamVoidEncoder(final OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public void encode(final T2 element) throws IOException { + outputStream.write(0); // emit 0 instead of null to enable to count emitted elements. + } + } + + @Override + public String toString() { + return beamCoder.toString(); + } +} diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java index 9c63892..fa5d380 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java @@ -20,7 +20,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import java.io.*; /** - * Coder for float[]. + * EncoderFactory for float[]. */ public final class FloatArrayCoder extends AtomicCoder { /** diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java index 80f53c0..c180bd6 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java @@ -20,7 +20,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import java.io.*; /** - * Coder for int[]. + * EncoderFactory for int[]. */ public final class IntArrayCoder extends AtomicCoder { /** diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java deleted file mode 100644 index 6317326..0000000 --- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.compiler.frontend.spark.coder; - -import edu.snu.nemo.common.coder.Coder; -import org.apache.spark.serializer.Serializer; -import scala.reflect.ClassTag$; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * Kryo Spark Coder for serialization. - * @param type of the object to (de)serialize. - */ -public final class SparkCoder implements Coder { - private final Serializer serializer; - - /** - * Default constructor. - * @param serializer kryo serializer. - */ - public SparkCoder(final Serializer serializer) { - this.serializer = serializer; - } - - @Override - public void encode(final T element, final OutputStream outStream) throws IOException { - serializer.newInstance().serializeStream(outStream).writeObject(element, ClassTag$.MODULE$.Any()); - } - - @Override - public T decode(final InputStream inStream) throws IOException { - final T obj = (T) serializer.newInstance().deserializeStream(inStream).readObject(ClassTag$.MODULE$.Any()); - return obj; - } -} diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java new file mode 100644 index 0000000..605b766 --- /dev/null +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.frontend.spark.coder; + +import edu.snu.nemo.common.coder.DecoderFactory; +import org.apache.spark.serializer.DeserializationStream; +import org.apache.spark.serializer.Serializer; +import org.apache.spark.serializer.SerializerInstance; +import scala.reflect.ClassTag$; + +import java.io.InputStream; + +/** + * Spark DecoderFactory for serialization. + * @param type of the object to deserialize. + */ +public final class SparkDecoderFactory implements DecoderFactory { + private final Serializer serializer; + + /** + * Default constructor. + * + * @param serializer Spark serializer. + */ + public SparkDecoderFactory(final Serializer serializer) { + this.serializer = serializer; + } + + @Override + public Decoder create(final InputStream inputStream) { + return new SparkDecoder<>(inputStream, serializer.newInstance()); + } + + /** + * SparkDecoder. + * @param type of the object to deserialize. + */ + private final class SparkDecoder implements Decoder { + + private final DeserializationStream in; + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + * @param sparkSerializerInstance the actual spark serializer instance to use. + */ + private SparkDecoder(final InputStream inputStream, + final SerializerInstance sparkSerializerInstance) { + this.in = sparkSerializerInstance.deserializeStream(inputStream); + } + + @Override + public T2 decode() { + return (T2) in.readObject(ClassTag$.MODULE$.Any()); + } + } +} diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java new file mode 100644 index 0000000..970a46c --- /dev/null +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.frontend.spark.coder; + +import edu.snu.nemo.common.coder.EncoderFactory; +import org.apache.spark.serializer.SerializationStream; +import org.apache.spark.serializer.Serializer; +import org.apache.spark.serializer.SerializerInstance; +import scala.reflect.ClassTag$; + +import java.io.OutputStream; + +/** + * Spark EncoderFactory for serialization. + * @param type of the object to serialize. + */ +public final class SparkEncoderFactory implements EncoderFactory { + private final Serializer serializer; + + /** + * Default constructor. + * + * @param serializer Spark serializer. + */ + public SparkEncoderFactory(final Serializer serializer) { + this.serializer = serializer; + } + + @Override + public Encoder create(final OutputStream outputStream) { + return new SparkEncoder<>(outputStream, serializer.newInstance()); + } + + /** + * SparkEncoder. + * @param type of the object to serialize. + */ + private final class SparkEncoder implements Encoder { + + private final SerializationStream out; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + * @param sparkSerializerInstance the actual spark serializer instance to use. + */ + private SparkEncoder(final OutputStream outputStream, + final SerializerInstance sparkSerializerInstance) { + this.out = sparkSerializerInstance.serializeStream(outputStream); + } + + @Override + public void encode(final T2 element) { + out.writeObject(element, ClassTag$.MODULE$.Any()); + } + } +} diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java index 6c4c395..08731a4 100644 --- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java @@ -19,14 +19,16 @@ import edu.snu.nemo.client.JobLauncher; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.LoopVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor; -import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder; +import edu.snu.nemo.compiler.frontend.spark.coder.SparkDecoderFactory; +import edu.snu.nemo.compiler.frontend.spark.coder.SparkEncoderFactory; import edu.snu.nemo.compiler.frontend.spark.transform.CollectTransform; import edu.snu.nemo.compiler.frontend.spark.transform.GroupByKeyTransform; import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform; @@ -87,8 +89,10 @@ public final class SparkFrontendUtils { * @param type of the return data. * @return the data collected. */ - public static List collect(final DAG dag, final Stack loopVertexStack, - final IRVertex lastVertex, final Serializer serializer) { + public static List collect(final DAG dag, + final Stack loopVertexStack, + final IRVertex lastVertex, + final Serializer serializer) { final DAGBuilder builder = new DAGBuilder<>(dag); // save result in a temporary file @@ -100,7 +104,8 @@ public final class SparkFrontendUtils { final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex), lastVertex, collectVertex); - newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer))); + newEdge.setProperty(EncoderProperty.of(new SparkEncoderFactory(serializer))); + newEdge.setProperty(DecoderProperty.of(new SparkDecoderFactory(serializer))); newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP); builder.connectVertices(newEdge); diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala index 5065f9c..14f764a 100644 --- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala +++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala @@ -19,11 +19,11 @@ import java.util import edu.snu.nemo.common.dag.DAGBuilder import edu.snu.nemo.common.ir.edge.IREdge -import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty} +import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty} import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex} import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor -import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder +import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory} import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform import org.apache.hadoop.conf.Configuration @@ -72,7 +72,10 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] ( val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex), self.lastVertex, reduceByKeyVertex) newEdge.setProperty( - CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer)) + EncoderProperty.of(new SparkEncoderFactory[Tuple2[K, V]](self.serializer)) + .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]) + newEdge.setProperty( + DecoderProperty.of(new SparkDecoderFactory[Tuple2[K, V]](self.serializer)) .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]) newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor)) builder.connectVertices(newEdge) diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala index 135f698..ad58399 100644 --- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala +++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala @@ -20,11 +20,11 @@ import java.util import edu.snu.nemo.client.JobLauncher import edu.snu.nemo.common.dag.{DAG, DAGBuilder} import edu.snu.nemo.common.ir.edge.IREdge -import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty} +import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty} import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex} import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor -import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder +import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory} import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils import edu.snu.nemo.compiler.frontend.spark.transform._ import org.apache.hadoop.io.WritableFactory @@ -51,8 +51,10 @@ final class RDD[T: ClassTag] protected[rdd] ( protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc) private val loopVertexStack = new util.Stack[LoopVertex] - private val coderProperty: EdgeExecutionProperty[_ <: Serializable] = - CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]] + private val encoderProperty: EdgeExecutionProperty[_ <: Serializable] = + EncoderProperty.of(new SparkEncoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]] + private val decoderProperty: EdgeExecutionProperty[_ <: Serializable] = + DecoderProperty.of(new SparkDecoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]] private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor) /** @@ -137,7 +139,8 @@ final class RDD[T: ClassTag] protected[rdd] ( val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex), lastVertex, mapVertex) - newEdge.setProperty(coderProperty) + newEdge.setProperty(encoderProperty) + newEdge.setProperty(decoderProperty) newEdge.setProperty(keyExtractorProperty) builder.connectVertices(newEdge) @@ -156,7 +159,8 @@ final class RDD[T: ClassTag] protected[rdd] ( val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex), lastVertex, flatMapVertex) - newEdge.setProperty(coderProperty) + newEdge.setProperty(encoderProperty) + newEdge.setProperty(decoderProperty) newEdge.setProperty(keyExtractorProperty) builder.connectVertices(newEdge) @@ -186,7 +190,8 @@ final class RDD[T: ClassTag] protected[rdd] ( val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex), lastVertex, reduceVertex) - newEdge.setProperty(coderProperty) + newEdge.setProperty(encoderProperty) + newEdge.setProperty(decoderProperty) newEdge.setProperty(keyExtractorProperty) builder.connectVertices(newEdge) @@ -210,7 +215,8 @@ final class RDD[T: ClassTag] protected[rdd] ( builder.addVertex(flatMapVertex, loopVertexStack) val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex), lastVertex, flatMapVertex) - newEdge.setProperty(coderProperty) + newEdge.setProperty(encoderProperty) + newEdge.setProperty(decoderProperty) newEdge.setProperty(keyExtractorProperty) builder.connectVertices(newEdge) diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java index 1d1a651..0ffddaa 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java @@ -50,6 +50,7 @@ public final class CompressionPass extends AnnotatingPass { dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream() .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get() .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get())) + .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent()) .forEach(edge -> edge.setProperty(CompressionProperty.of(compression)))); return dag; diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java similarity index 67% copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java index 1d1a651..6ec2fde 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java @@ -18,31 +18,22 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty; /** - * A pass for applying compression algorithm for data flowing between vertices. + * A pass for applying decompression algorithm for data flowing between vertices. + * It always */ -public final class CompressionPass extends AnnotatingPass { - private final CompressionProperty.Value compression; - - /** - * Default constructor. Uses LZ4 as default. - */ - public CompressionPass() { - super(CompressionProperty.class); - this.compression = CompressionProperty.Value.LZ4; - } +public final class DecompressionPass extends AnnotatingPass { /** * Constructor. - * @param compression Compression to apply on edges. */ - public CompressionPass(final CompressionProperty.Value compression) { + public DecompressionPass() { super(CompressionProperty.class); - this.compression = compression; } @Override @@ -50,7 +41,11 @@ public final class CompressionPass extends AnnotatingPass { dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream() .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get() .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get())) - .forEach(edge -> edge.setProperty(CompressionProperty.of(compression)))); + // Find edges which have a compression property but not decompression property. + .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent() + && !edge.getPropertyValue(DecompressionProperty.class).isPresent()) + .forEach(edge -> edge.setProperty(DecompressionProperty.of( + edge.getPropertyValue(CompressionProperty.class).get())))); return dag; } diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java similarity index 64% copy from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java copy to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java index eade93d..2283bfb 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java @@ -15,32 +15,33 @@ */ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; /** - * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder. + * Pass for initiating IREdge Decoder ExecutionProperty with default dummy coder. */ -public final class DefaultEdgeCoderPass extends AnnotatingPass { +public final class DefaultEdgeDecoderPass extends AnnotatingPass { - private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER); + private static final DecoderProperty DEFAULT_DECODER_PROPERTY = + DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY); /** * Default constructor. */ - public DefaultEdgeCoderPass() { - super(CoderProperty.class); + public DefaultEdgeDecoderPass() { + super(DecoderProperty.class); } @Override public DAG apply(final DAG dag) { dag.topologicalDo(irVertex -> dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> { - if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) { - irEdge.setProperty(DEFAULT_CODER_PROPERTY); + if (!irEdge.getPropertyValue(DecoderProperty.class).isPresent()) { + irEdge.setProperty(DEFAULT_DECODER_PROPERTY); } })); return dag; diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java similarity index 64% rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java index eade93d..0238c72 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java @@ -15,32 +15,33 @@ */ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; /** - * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder. + * Pass for initiating IREdge Encoder ExecutionProperty with default dummy coder. */ -public final class DefaultEdgeCoderPass extends AnnotatingPass { +public final class DefaultEdgeEncoderPass extends AnnotatingPass { - private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER); + private static final EncoderProperty DEFAULT_DECODER_PROPERTY = + EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY); /** * Default constructor. */ - public DefaultEdgeCoderPass() { - super(CoderProperty.class); + public DefaultEdgeEncoderPass() { + super(EncoderProperty.class); } @Override public DAG apply(final DAG dag) { dag.topologicalDo(irVertex -> dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> { - if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) { - irEdge.setProperty(DEFAULT_CODER_PROPERTY); + if (!irEdge.getPropertyValue(EncoderProperty.class).isPresent()) { + irEdge.setProperty(DEFAULT_DECODER_PROPERTY); } })); return dag; diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java index adf862a..b0ab2b1 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java @@ -31,12 +31,14 @@ public final class PrimitiveCompositePass extends CompositePass { public PrimitiveCompositePass() { super(Arrays.asList( new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning - new DefaultEdgeCoderPass(), + new DefaultEdgeEncoderPass(), + new DefaultEdgeDecoderPass(), new DefaultStagePartitioningPass(), new ReviseInterStageEdgeDataStorePass(), // after stage partitioning new DefaultEdgeUsedDataHandlingPass(), new ScheduleGroupPass(), - new CompressionPass() + new CompressionPass(), + new DecompressionPass() )); } } diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java index 64b1060..eef0b20 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java @@ -15,9 +15,11 @@ */ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; @@ -150,9 +152,13 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass { outListToModify.remove(e); final IREdge newIrEdge = new IREdge(e.getPropertyValue(DataCommunicationPatternProperty.class).get(), operatorVertexToUse, e.getDst()); - final Optional coderProperty = e.getPropertyValue(CoderProperty.class); - if (coderProperty.isPresent()) { - newIrEdge.setProperty(CoderProperty.of(coderProperty.get())); + final Optional encoderProperty = e.getPropertyValue(EncoderProperty.class); + if (encoderProperty.isPresent()) { + newIrEdge.setProperty(EncoderProperty.of(encoderProperty.get())); + } + final Optional decoderProperty = e.getPropertyValue(DecoderProperty.class); + if (decoderProperty.isPresent()) { + newIrEdge.setProperty(DecoderProperty.of(decoderProperty.get())); } outListToModify.add(newIrEdge); }); diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java index 4139264..1117a7c 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java @@ -18,7 +18,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; @@ -66,7 +67,8 @@ public final class DataSkewReshapingPass extends ReshapingPass { // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices. final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, edge.getSrc(), metricCollectionBarrierVertex); - newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get())); + newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get())); + newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get())); final IREdge edgeToGbK = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(), metricCollectionBarrierVertex, v, edge.isSideInput()); diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java index 10d76e1..072c163 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java @@ -17,7 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.LoopVertex; import edu.snu.nemo.common.dag.DAG; @@ -288,7 +289,8 @@ public final class LoopOptimizations { edgesToRemove.add(edge); final IREdge newEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(), candidate.getKey(), edge.getDst(), edge.isSideInput()); - newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get())); + newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get())); + newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get())); edgesToAdd.add(newEdge); }); final List listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>()); diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java index e899da9..9bf434a 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java @@ -17,7 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.vertex.IRVertex; @@ -63,7 +64,8 @@ public final class SailfishRelayReshapingPass extends ReshapingPass { edge.copyExecutionPropertiesTo(newEdgeToMerger); final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, iFileMergerVertex, v); - newEdgeFromMerger.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get())); + newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get())); + newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get())); builder.connectVertices(newEdgeToMerger); builder.connectVertices(newEdgeFromMerger); } else { diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java index 00d2cb3..c6bec6a 100644 --- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java +++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java @@ -193,7 +193,7 @@ public final class JavaUserDefinedTypedAggregation { } /** - * Specifies the Encoder for the intermediate value type. + * Specifies the EncoderFactory for the intermediate value type. * * @return buffer encoder. */ @@ -202,7 +202,7 @@ public final class JavaUserDefinedTypedAggregation { } /** - * Specifies the Encoder for the final output value type. + * Specifies the EncoderFactory for the final output value type. * * @return output encoder. */ diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java index c9d3cc2..69f5a17 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java @@ -17,7 +17,9 @@ package edu.snu.nemo.runtime.executor; import com.google.protobuf.ByteString; import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.conf.JobConf; @@ -108,15 +110,21 @@ public final class Executor { new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender); task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(), - e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class) - .orElse(null))); + e.getPropertyValue(EncoderProperty.class).get(), + e.getPropertyValue(DecoderProperty.class).get(), + e.getPropertyValue(CompressionProperty.class).orElse(null), + e.getPropertyValue(DecompressionProperty.class).orElse(null))); task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(), - e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class). - orElse(null))); + e.getPropertyValue(EncoderProperty.class).get(), + e.getPropertyValue(DecoderProperty.class).get(), + e.getPropertyValue(CompressionProperty.class).orElse(null), + e.getPropertyValue(DecompressionProperty.class).orElse(null))); irDag.getVertices().forEach(v -> { irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(), - e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class) - .orElse(null))); + e.getPropertyValue(EncoderProperty.class).get(), + e.getPropertyValue(DecoderProperty.class).get(), + e.getPropertyValue(CompressionProperty.class).orElse(null), + e.getPropertyValue(DecompressionProperty.class).orElse(null))); }); new TaskExecutor( diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java index ee0aa46..69eb9ba 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java @@ -17,10 +17,12 @@ package edu.snu.nemo.runtime.executor.data; import com.google.common.io.CountingInputStream; import edu.snu.nemo.common.DirectByteArrayOutputStream; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition; -import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer; +import edu.snu.nemo.runtime.executor.data.streamchainer.DecodeStreamChainer; +import edu.snu.nemo.runtime.executor.data.streamchainer.EncodeStreamChainer; import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +49,19 @@ public final class DataUtil { /** * Serializes the elements in a non-serialized partition into an output stream. * - * @param coder the coder to encode the elements. + * @param encoderFactory the encoderFactory to encode the elements. * @param nonSerializedPartition the non-serialized partition to serialize. * @param bytesOutputStream the output stream to write. * @return total number of elements in the partition. * @throws IOException if fail to serialize. */ - public static long serializePartition(final Coder coder, + public static long serializePartition(final EncoderFactory encoderFactory, final NonSerializedPartition nonSerializedPartition, final OutputStream bytesOutputStream) throws IOException { long elementsCount = 0; + final EncoderFactory.Encoder encoder = encoderFactory.create(bytesOutputStream); for (final Object element : nonSerializedPartition.getData()) { - coder.encode(element, bytesOutputStream); + encoder.encode(element); elementsCount++; } @@ -77,9 +80,10 @@ public final class DataUtil { * @throws IOException if fail to deserialize. */ public static NonSerializedPartition deserializePartition(final long elementsInPartition, - final Serializer serializer, - final K key, - final InputStream inputStream) throws IOException { + final Serializer serializer, + final K key, + final InputStream inputStream) + throws IOException { final List deserializedData = new ArrayList(); final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(), serializer, elementsInPartition); @@ -105,9 +109,10 @@ public final class DataUtil { for (final NonSerializedPartition partitionToConvert : partitionsToConvert) { try ( final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream(); - final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers()); + final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers()); ) { - final long elementsTotal = serializePartition(serializer.getCoder(), partitionToConvert, wrappedStream); + final long elementsTotal = + serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream); // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns // inner buffer directly, which can be an unfinished(not flushed) buffer. wrappedStream.close(); @@ -190,13 +195,14 @@ public final class DataUtil { } /** - * An iterator that emits objects from {@link InputStream} using the corresponding {@link Coder}. + * An iterator that emits objects from {@link InputStream} using the corresponding {@link DecoderFactory}. + * * @param The type of elements. */ public static final class InputStreamIterator implements IteratorWithNumBytes { private final Iterator inputStreams; - private final Serializer serializer; + private final Serializer serializer; private final long limit; private volatile CountingInputStream serializedCountingStream = null; @@ -204,17 +210,19 @@ public final class DataUtil { private volatile boolean hasNext = false; private volatile T next; private volatile boolean cannotContinueDecoding = false; + private volatile DecoderFactory.Decoder decoder = null; private volatile long elementsDecoded = 0; private volatile long numSerializedBytes = 0; private volatile long numEncodedBytes = 0; /** - * Construct {@link Iterator} from {@link InputStream} and {@link Coder}. + * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}. * * @param inputStreams The streams to read data from. * @param serializer The serializer. */ - public InputStreamIterator(final Iterator inputStreams, final Serializer serializer) { + public InputStreamIterator(final Iterator inputStreams, + final Serializer serializer) { this.inputStreams = inputStreams; this.serializer = serializer; // -1 means no limit. @@ -222,7 +230,7 @@ public final class DataUtil { } /** - * Construct {@link Iterator} from {@link InputStream} and {@link Coder}. + * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}. * * @param inputStreams The streams to read data from. * @param serializer The serializer. @@ -230,7 +238,7 @@ public final class DataUtil { */ public InputStreamIterator( final Iterator inputStreams, - final Serializer serializer, + final Serializer serializer, final long limit) { if (limit < 0) { throw new IllegalArgumentException("Negative limit not allowed."); @@ -254,11 +262,12 @@ public final class DataUtil { } while (true) { try { - if (encodedCountingStream == null) { + if (decoder == null) { if (inputStreams.hasNext()) { serializedCountingStream = new CountingInputStream(inputStreams.next()); encodedCountingStream = new CountingInputStream(buildInputStream( - serializedCountingStream, serializer.getStreamChainers())); + serializedCountingStream, serializer.getDecodeStreamChainers())); + decoder = serializer.getDecoderFactory().create(encodedCountingStream); } else { cannotContinueDecoding = true; return false; @@ -269,7 +278,7 @@ public final class DataUtil { throw new RuntimeException(e); } try { - next = serializer.getCoder().decode(encodedCountingStream); + next = decoder.decode(); hasNext = true; elementsDecoded++; return true; @@ -279,6 +288,7 @@ public final class DataUtil { numEncodedBytes += encodedCountingStream.getCount(); serializedCountingStream = null; encodedCountingStream = null; + decoder = null; } } } @@ -313,50 +323,54 @@ public final class DataUtil { } /** - * Chain {@link InputStream} with {@link StreamChainer}s. + * Chain {@link InputStream} with {@link DecodeStreamChainer}s. * - * @param in the {@link InputStream}. - * @param streamChainers the list of {@link StreamChainer} to be applied on the stream. - * @return chained {@link InputStream}. - * @throws IOException if fail to create new stream. + * @param in the {@link InputStream}. + * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to be applied on the stream. + * @return chained {@link InputStream}. + * @throws IOException if fail to create new stream. */ - public static InputStream buildInputStream(final InputStream in, final List streamChainers) - throws IOException { + public static InputStream buildInputStream(final InputStream in, + final List decodeStreamChainers) + throws IOException { InputStream chained = in; - for (final StreamChainer streamChainer : streamChainers) { - chained = streamChainer.chainInput(chained); + for (final DecodeStreamChainer encodeStreamChainer : decodeStreamChainers) { + chained = encodeStreamChainer.chainInput(chained); } return chained; } /** - * Chain {@link OutputStream} with {@link StreamChainer}s. + * Chain {@link OutputStream} with {@link EncodeStreamChainer}s. * - * @param out the {@link OutputStream}. - * @param streamChainers the list of {@link StreamChainer} to be applied on the stream. - * @return chained {@link OutputStream}. - * @throws IOException if fail to create new stream. + * @param out the {@link OutputStream}. + * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to be applied on the stream. + * @return chained {@link OutputStream}. + * @throws IOException if fail to create new stream. */ - public static OutputStream buildOutputStream(final OutputStream out, final List streamChainers) - throws IOException { + public static OutputStream buildOutputStream(final OutputStream out, + final List encodeStreamChainers) + throws IOException { OutputStream chained = out; - final List temporaryStreamChainerList = new ArrayList<>(streamChainers); - Collections.reverse(temporaryStreamChainerList); - for (final StreamChainer streamChainer : temporaryStreamChainerList) { - chained = streamChainer.chainOutput(chained); + final List temporaryEncodeStreamChainerList = new ArrayList<>(encodeStreamChainers); + Collections.reverse(temporaryEncodeStreamChainerList); + for (final EncodeStreamChainer encodeStreamChainer : temporaryEncodeStreamChainerList) { + chained = encodeStreamChainer.chainOutput(chained); } return chained; } /** * {@link Iterator} with interface to access to the number of bytes. + * * @param the type of decoded object */ public interface IteratorWithNumBytes extends Iterator { /** * Create an {@link IteratorWithNumBytes}, with no information about the number of bytes. + * * @param innerIterator {@link Iterator} to wrap - * @param the type of decoded object + * @param the type of decoded object * @return an {@link IteratorWithNumBytes}, with no information about the number of bytes */ static IteratorWithNumBytes of(final Iterator innerIterator) { @@ -385,10 +399,11 @@ public final class DataUtil { /** * Create an {@link IteratorWithNumBytes}, with the number of bytes in decoded and serialized form. - * @param innerIterator {@link Iterator} to wrap + * + * @param innerIterator {@link Iterator} to wrap * @param numSerializedBytes the number of bytes in serialized form - * @param numEncodedBytes the number of bytes in encoded form - * @param the type of decoded object + * @param numEncodedBytes the number of bytes in encoded form + * @param the type of decoded object * @return an {@link IteratorWithNumBytes}, with the information about the number of bytes */ static IteratorWithNumBytes of(final Iterator innerIterator, @@ -432,6 +447,7 @@ public final class DataUtil { /** * This method should be called after the actual data is taken out of iterator, * since the existence of an iterator does not guarantee that data inside it is ready. + * * @return the number of bytes in serialized form (which is, for example, encoded and compressed) * @throws NumBytesNotSupportedException when the operation is not supported * @throws IllegalStateException when the information is not ready @@ -441,6 +457,7 @@ public final class DataUtil { /** * This method should be called after the actual data is taken out of iterator, * since the existence of an iterator does not guarantee that data inside it is ready. + * * @return the number of bytes in encoded form (which is ready to be decoded) * @throws NumBytesNotSupportedException when the operation is not supported * @throws IllegalStateException when the information is not ready diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java index 46c4609..5ea1bec 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java @@ -15,23 +15,22 @@ */ package edu.snu.nemo.runtime.executor.data; -import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer; -import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; +import edu.snu.nemo.runtime.executor.data.streamchainer.*; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; -import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.inject.Inject; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** - * Mapping from RuntimeEdgeId to Coder. + * Mapping from RuntimeEdgeId to {@link Serializer}. */ public final class SerializerManager { private static final Logger LOG = LoggerFactory.getLogger(SerializerManager.class.getName()); @@ -45,40 +44,53 @@ public final class SerializerManager { } /** - * Register a coder for runtime edge. + * Register a encoderFactory for runtime edge. + * This method regards that compression & decompression property are empty. * - * @param runtimeEdgeId id of the runtime edge. - * @param coder the corresponding coder. + * @param runtimeEdgeId id of the runtime edge. + * @param encoderFactory the corresponding encoder factory. + * @param decoderFactory the corresponding decoder factory. */ public void register(final String runtimeEdgeId, - final Coder coder) { - register(runtimeEdgeId, coder, null); + final EncoderFactory encoderFactory, + final DecoderFactory decoderFactory) { + register(runtimeEdgeId, encoderFactory, decoderFactory, null, null); } /** - * Register a coder for runtime edge. + * Register a encoderFactory for runtime edge. * - * @param runtimeEdgeId id of the runtime edge. - * @param coder the corresponding coder. + * @param runtimeEdgeId id of the runtime edge. + * @param encoderFactory the corresponding encoder factory. + * @param decoderFactory the corresponding decoder factory. * @param compressionProperty compression property, or null not to enable compression + * @param decompressionProperty decompression property, or null not to enable decompression */ public void register(final String runtimeEdgeId, - final Coder coder, - final CompressionProperty.Value compressionProperty) { + final EncoderFactory encoderFactory, + final DecoderFactory decoderFactory, + @Nullable final CompressionProperty.Value compressionProperty, + @Nullable final CompressionProperty.Value decompressionProperty) { LOG.debug("{} edge id registering to SerializerManager", runtimeEdgeId); - final Serializer serializer = new Serializer(coder, Collections.emptyList()); - runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer); - final List streamChainerList = new ArrayList<>(); + final List encodeStreamChainers = new ArrayList<>(); + final List decodeStreamChainers = new ArrayList<>(); // Compression chain if (compressionProperty != null) { LOG.debug("Adding {} compression chain for {}", compressionProperty, runtimeEdgeId); - streamChainerList.add(new CompressionStreamChainer(compressionProperty)); + encodeStreamChainers.add(new CompressionStreamChainer(compressionProperty)); + } + if (decompressionProperty != null) { + LOG.debug("Adding {} decompression chain for {}", + decompressionProperty, runtimeEdgeId); + decodeStreamChainers.add(new DecompressionStreamChainer(decompressionProperty)); } - serializer.setStreamChainers(streamChainerList); + final Serializer serializer = + new Serializer(encoderFactory, decoderFactory, encodeStreamChainers, decodeStreamChainers); + runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer); } /** diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java index 200e465..d6d63d1 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java @@ -16,7 +16,7 @@ package edu.snu.nemo.runtime.executor.data.partition; import edu.snu.nemo.common.DirectByteArrayOutputStream; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer; import javax.annotation.Nullable; @@ -39,7 +39,7 @@ public final class SerializedPartition implements Partition { // Will be null when the partition is committed when it is constructed. @Nullable private final DirectByteArrayOutputStream bytesOutputStream; @Nullable private final OutputStream wrappedStream; - @Nullable private final Coder coder; + @Nullable private final EncoderFactory.Encoder encoder; /** * Creates a serialized {@link Partition} without actual data. @@ -57,8 +57,8 @@ public final class SerializedPartition implements Partition { this.length = 0; this.committed = false; this.bytesOutputStream = new DirectByteArrayOutputStream(); - this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers()); - this.coder = serializer.getCoder(); + this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers()); + this.encoder = serializer.getEncoderFactory().create(wrappedStream); } /** @@ -81,7 +81,7 @@ public final class SerializedPartition implements Partition { this.committed = true; this.bytesOutputStream = null; this.wrappedStream = null; - this.coder = null; + this.encoder = null; } /** @@ -96,7 +96,7 @@ public final class SerializedPartition implements Partition { throw new IOException("The partition is already committed!"); } else { try { - coder.encode(element, wrappedStream); + encoder.encode(element); elementsCount++; } catch (final IOException e) { wrappedStream.close(); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java index c5c2e92..467ba18 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java @@ -17,19 +17,16 @@ package edu.snu.nemo.runtime.executor.data.streamchainer; import edu.snu.nemo.common.exception.UnsupportedCompressionException; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; -import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; /** - * {@link StreamChainer} for applying compression. + * {@link EncodeStreamChainer} for applying compression. */ -public class CompressionStreamChainer implements StreamChainer { +public class CompressionStreamChainer implements EncodeStreamChainer { private final CompressionProperty.Value compression; /** @@ -42,18 +39,6 @@ public class CompressionStreamChainer implements StreamChainer { } @Override - public final InputStream chainInput(final InputStream in) throws IOException { - switch (compression) { - case Gzip: - return new GZIPInputStream(in); - case LZ4: - return new LZ4BlockInputStream(in); - default: - throw new UnsupportedCompressionException("Not supported compression method"); - } - } - - @Override public final OutputStream chainOutput(final OutputStream out) throws IOException { switch (compression) { case Gzip: diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java similarity index 67% copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java copy to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java index 2dbeec7..6084b6e 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java @@ -17,13 +17,13 @@ package edu.snu.nemo.runtime.executor.data.streamchainer; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; /** - * A {@link StreamChainer} object indicates each stream manipulation strategy. - * Stream can be chained by {@link StreamChainer} multiple times. + * A {@link DecodeStreamChainer} object indicates each stream manipulation strategy. + * Stream can be chained by {@link DecodeStreamChainer} multiple times. */ -public interface StreamChainer { +public interface DecodeStreamChainer { + /** * Chain {@link InputStream} and returns chained {@link InputStream}. * @@ -32,13 +32,4 @@ public interface StreamChainer { * @throws IOException if fail to chain the stream. */ InputStream chainInput(InputStream in) throws IOException; - - /** - * Chain {@link OutputStream} and returns chained {@link OutputStream}. - * - * @param out the stream which will be chained. - * @return chained {@link OutputStream}. - * @throws IOException if fail to chain the stream. - */ - OutputStream chainOutput(OutputStream out) throws IOException; } diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java similarity index 69% copy from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java copy to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java index c5c2e92..558bd35 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java @@ -18,18 +18,15 @@ package edu.snu.nemo.runtime.executor.data.streamchainer; import edu.snu.nemo.common.exception.UnsupportedCompressionException; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; import net.jpountz.lz4.LZ4BlockInputStream; -import net.jpountz.lz4.LZ4BlockOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; /** - * {@link StreamChainer} for applying compression. + * {@link DecodeStreamChainer} for applying compression. */ -public class CompressionStreamChainer implements StreamChainer { +public class DecompressionStreamChainer implements DecodeStreamChainer { private final CompressionProperty.Value compression; /** @@ -37,7 +34,7 @@ public class CompressionStreamChainer implements StreamChainer { * * @param compression compression method. */ - public CompressionStreamChainer(final CompressionProperty.Value compression) { + public DecompressionStreamChainer(final CompressionProperty.Value compression) { this.compression = compression; } @@ -52,16 +49,4 @@ public class CompressionStreamChainer implements StreamChainer { throw new UnsupportedCompressionException("Not supported compression method"); } } - - @Override - public final OutputStream chainOutput(final OutputStream out) throws IOException { - switch (compression) { - case Gzip: - return new GZIPOutputStream(out); - case LZ4: - return new LZ4BlockOutputStream(out); - default: - throw new UnsupportedCompressionException("Not supported compression method"); - } - } } diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java similarity index 67% rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java index 2dbeec7..b0c7547 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java @@ -16,22 +16,13 @@ package edu.snu.nemo.runtime.executor.data.streamchainer; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; /** - * A {@link StreamChainer} object indicates each stream manipulation strategy. - * Stream can be chained by {@link StreamChainer} multiple times. + * A {@link EncodeStreamChainer} object indicates each stream manipulation strategy. + * Stream can be chained by {@link EncodeStreamChainer} multiple times. */ -public interface StreamChainer { - /** - * Chain {@link InputStream} and returns chained {@link InputStream}. - * - * @param in the stream which will be chained. - * @return chained {@link InputStream}. - * @throws IOException if fail to chain the stream. - */ - InputStream chainInput(InputStream in) throws IOException; +public interface EncodeStreamChainer { /** * Chain {@link OutputStream} and returns chained {@link OutputStream}. diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java index d79d28f..e30147a 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java @@ -15,53 +15,65 @@ */ package edu.snu.nemo.runtime.executor.data.streamchainer; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; import java.util.List; /** - * class that contains {@link Coder} and {@link List} of {@link StreamChainer}. - * @param coder element type. + * class that contains {@link EncoderFactory}, {@link DecoderFactory} and {@link List} of {@link EncodeStreamChainer}. + * @param encoderFactory element type. + * @param decoderFactory element type. */ -public final class Serializer { - private Coder coder; - private List streamChainers; +public final class Serializer { + private final EncoderFactory encoderFactory; + private final DecoderFactory decoderFactory; + private final List encodeStreamChainers; + private final List decodeStreamChainers; /** * Constructor. * - * @param coder {@link Coder}. - * @param streamChainers list of {@link StreamChainer}. + * @param encoderFactory {@link EncoderFactory}. + * @param decoderFactory {@link DecoderFactory}. + * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to use for encoding. + * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to use for decoding. */ - public Serializer(final Coder coder, final List streamChainers) { - this.coder = coder; - this.streamChainers = streamChainers; + public Serializer(final EncoderFactory encoderFactory, + final DecoderFactory decoderFactory, + final List encodeStreamChainers, + final List decodeStreamChainers) { + this.encoderFactory = encoderFactory; + this.decoderFactory = decoderFactory; + this.encodeStreamChainers = encodeStreamChainers; + this.decodeStreamChainers = decodeStreamChainers; } /** - * method that returns {@link Coder}. - * - * @return {@link Coder}. + * @return the {@link EncoderFactory} to use. */ - public Coder getCoder() { - return coder; + public EncoderFactory getEncoderFactory() { + return encoderFactory; } /** - * method that returns list of {@link StreamChainer}. - * - * @return list of {@link StreamChainer}. + * @return the {@link DecoderFactory} to use. */ - public List getStreamChainers() { - return streamChainers; + public DecoderFactory getDecoderFactory() { + return decoderFactory; } /** - * method that sets list of {@link StreamChainer}. - * - * @param streamChainers list of {@link StreamChainer}. + * @return the list of {@link EncodeStreamChainer} for encoding. + */ + public List getEncodeStreamChainers() { + return encodeStreamChainers; + } + + /** + * @return the list of {@link EncodeStreamChainer} for decoding. */ - public void setStreamChainers(final List streamChainers) { - this.streamChainers = streamChainers; + public List getDecodeStreamChainers() { + return decodeStreamChainers; } } diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java index 74c2504..b9f0c63 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java @@ -16,11 +16,9 @@ package edu.snu.nemo.runtime.executor.data; import edu.snu.nemo.common.Pair; -import edu.snu.nemo.common.coder.IntCoder; -import edu.snu.nemo.common.coder.PairCoder; +import edu.snu.nemo.common.coder.*; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.data.HashRange; import edu.snu.nemo.runtime.common.data.KeyRange; @@ -30,6 +28,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment; import edu.snu.nemo.runtime.common.state.BlockState; import edu.snu.nemo.runtime.executor.data.block.Block; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; +import edu.snu.nemo.runtime.executor.data.streamchainer.DecompressionStreamChainer; import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer; import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer; import edu.snu.nemo.runtime.executor.data.stores.*; @@ -71,9 +70,11 @@ import static org.mockito.Mockito.when; @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class}) public final class BlockStoreTest { private static final String TMP_FILE_DIRECTORY = "./tmpFiles"; - private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of()); - private static final Serializer SERIALIZER = new Serializer(CODER, - Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4))); + private static final Serializer SERIALIZER = new Serializer( + PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()), + PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of()), + Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)), + Collections.singletonList(new DecompressionStreamChainer(CompressionProperty.Value.LZ4))); private static final SerializerManager serializerManager = mock(SerializerManager.class); private BlockManagerMaster blockManagerMaster; private LocalMessageDispatcher messageDispatcher; diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java index 012701c..0b119d8 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java @@ -15,8 +15,7 @@ */ package edu.snu.nemo.runtime.executor.datatransfer; -import edu.snu.nemo.common.coder.IntCoder; -import edu.snu.nemo.common.coder.PairCoder; +import edu.snu.nemo.common.coder.*; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.*; @@ -26,7 +25,6 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; import edu.snu.nemo.common.test.EmptyComponents; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.common.Pair; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; @@ -43,9 +41,6 @@ import edu.snu.nemo.runtime.executor.Executor; import edu.snu.nemo.runtime.executor.MetricManagerWorker; import edu.snu.nemo.runtime.executor.data.BlockManagerWorker; import edu.snu.nemo.runtime.executor.data.SerializerManager; -import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory; -import edu.snu.nemo.runtime.executor.datatransfer.InputReader; -import edu.snu.nemo.runtime.executor.datatransfer.OutputWriter; import edu.snu.nemo.runtime.master.MetricMessageHandler; import edu.snu.nemo.runtime.master.BlockManagerMaster; import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; @@ -105,7 +100,9 @@ public final class DataTransferTest { private static final int PARALLELISM_TEN = 10; private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)"; private static final AtomicInteger TEST_INDEX = new AtomicInteger(0); - private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of()); + private static final EncoderFactory ENCODER_FACTORY = PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()); + private static final DecoderFactory DECODER_FACTORY = + PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of()); private static final Tang TANG = Tang.Factory.getTang(); private static final int HASH_RANGE_MULTIPLIER = 10; @@ -307,14 +304,14 @@ public final class DataTransferTest { // Edge setup final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex); - dummyIREdge.setProperty(CoderProperty.of(CODER)); dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element))); + dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern)); + dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner)); + dummyIREdge.setProperty(DataStoreProperty.of(store)); + dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep)); + dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY)); + dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY)); final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties(); - edgeProperties.put(DataCommunicationPatternProperty.of(commPattern)); - edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner)); - edgeProperties.put(DataStoreProperty.of(store)); - edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep)); - edgeProperties.put(CoderProperty.of(CODER)); final RuntimeEdge dummyEdge; final IRVertex srcMockVertex = mock(IRVertex.class); @@ -398,20 +395,20 @@ public final class DataTransferTest { // Edge setup final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex); - dummyIREdge.setProperty(CoderProperty.of(CODER)); + dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY)); + dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY)); dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element))); - final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties(); - edgeProperties.put(DataCommunicationPatternProperty.of(commPattern)); - edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner)); - edgeProperties.put(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy"))); + dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern)); + dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner)); + dummyIREdge.setProperty(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy"))); final Optional duplicateDataProperty - = edgeProperties.get(DuplicateEdgeGroupProperty.class); + = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class); duplicateDataProperty.get().setRepresentativeEdgeId(edgeId); duplicateDataProperty.get().setGroupSize(2); - - edgeProperties.put(DataStoreProperty.of(store)); - edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep)); + dummyIREdge.setProperty(DataStoreProperty.of(store)); + dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep)); final RuntimeEdge dummyEdge, dummyEdge2; + final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties(); final IRVertex srcMockVertex = mock(IRVertex.class); final IRVertex dstMockVertex = mock(IRVertex.class); @@ -523,8 +520,8 @@ public final class DataTransferTest { private Pair setupVertices(final String edgeId, final BlockManagerWorker sender, final BlockManagerWorker receiver) { - serializerManagers.get(sender).register(edgeId, CODER); - serializerManagers.get(receiver).register(edgeId, CODER); + serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY); + serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY); // Src setup final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source"); @@ -543,10 +540,10 @@ public final class DataTransferTest { final String edgeId2, final BlockManagerWorker sender, final BlockManagerWorker receiver) { - serializerManagers.get(sender).register(edgeId, CODER); - serializerManagers.get(receiver).register(edgeId, CODER); - serializerManagers.get(sender).register(edgeId2, CODER); - serializerManagers.get(receiver).register(edgeId2, CODER); + serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY); + serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY); + serializerManagers.get(sender).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY); + serializerManagers.get(receiver).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY); // Src setup final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source"); diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java index 55d1d3f..998f3aa 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java @@ -17,7 +17,6 @@ package edu.snu.nemo.runtime.executor.task; import edu.snu.nemo.common.Pair; import edu.snu.nemo.common.ir.OutputCollector; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.Readable; diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java index b403f7b..002873c 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java @@ -16,13 +16,16 @@ package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating; import edu.snu.nemo.client.JobLauncher; -import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.coder.EncoderFactory; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass; -import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeDecoderPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeEncoderPass; import edu.snu.nemo.tests.compiler.CompilerTestUtil; import org.junit.Before; import org.junit.Test; @@ -33,7 +36,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; /** - * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}. + * Test {@link DefaultEdgeEncoderPass} and {@link DefaultEdgeDecoderPass}. */ @RunWith(PowerMockRunner.class) @PrepareForTest(JobLauncher.class) @@ -47,33 +50,43 @@ public class DefaultEdgeCoderPassTest { @Test public void testAnnotatingPass() { - final AnnotatingPass coderPass = new DefaultEdgeCoderPass(); - assertEquals(CoderProperty.class, coderPass.getExecutionPropertyToModify()); + final AnnotatingPass encoderPass = new DefaultEdgeEncoderPass(); + assertEquals(EncoderProperty.class, encoderPass.getExecutionPropertyToModify()); + final AnnotatingPass decoderPass = new DefaultEdgeDecoderPass(); + assertEquals(DecoderProperty.class, decoderPass.getExecutionPropertyToModify()); } @Test public void testNotOverride() { // Get the first coder from the compiled DAG - final Coder compiledCoder = compiledDAG - .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getPropertyValue(CoderProperty.class).get(); - final DAG processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG); + final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0); + final EncoderFactory compiledEncoderFactory = irEdge.getPropertyValue(EncoderProperty.class).get(); + final DecoderFactory compiledDecoderFactory = irEdge.getPropertyValue(DecoderProperty.class).get(); + DAG processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG); + processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG); // Get the first coder from the processed DAG - final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)) - .get(0).getPropertyValue(CoderProperty.class).get(); - assertEquals(compiledCoder, processedCoder); // It must not be changed. + final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0); + final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get(); + assertEquals(compiledEncoderFactory, processedEncoderFactory); // It must not be changed. + final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get(); + assertEquals(compiledDecoderFactory, processedDecoderFactory); // It must not be changed. } @Test public void testSetToDefault() throws Exception { // Remove the first coder from the compiled DAG (to let our pass to set as default coder). - compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)) - .get(0).getExecutionProperties().remove(CoderProperty.class); - final DAG processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG); + final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0); + irEdge.getExecutionProperties().remove(EncoderProperty.class); + irEdge.getExecutionProperties().remove(DecoderProperty.class); + DAG processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG); + processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG); - // Check whether the pass set the empty coder to our default coder. - final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)) - .get(0).getPropertyValue(CoderProperty.class).get(); - assertEquals(Coder.DUMMY_CODER, processedCoder); + // Check whether the pass set the empty coder to our default encoder & decoder. + final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0); + final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get(); + final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get(); + assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, processedEncoderFactory); + assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, processedDecoderFactory); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java index 8a6b1a8..d2d6954 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java @@ -20,7 +20,8 @@ import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.LoopVertex; import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass; @@ -114,13 +115,15 @@ public class LoopFusionPassTest { loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> { final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(), vertexToBeFollowed, loopVertexToFollow); - newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get())); + newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get())); + newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get())); builder.connectVertices(newIREdge); })); loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> { final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(), irEdge.getSrc(), loopVertexToFollow); - newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get())); + newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get())); + newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get())); builder.connectVertices(newIREdge); })); } diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java index de6ff56..a461873 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java @@ -20,7 +20,8 @@ import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.LoopVertex; import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass; @@ -92,7 +93,8 @@ public class LoopInvariantCodeMotionPassTest { assertTrue(incomingEdge.isPresent()); final IREdge newIREdge = new IREdge(incomingEdge.get().getPropertyValue( DataCommunicationPatternProperty.class).get(), incomingEdge.get().getSrc(), alsLoop); - newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getPropertyValue(CoderProperty.class).get())); + newIREdge.setProperty(EncoderProperty.of(incomingEdge.get().getPropertyValue(EncoderProperty.class).get())); + newIREdge.setProperty(DecoderProperty.of(incomingEdge.get().getPropertyValue(DecoderProperty.class).get())); builder.connectVertices(newIREdge); } }); diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java index 793c182..8d13374 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java @@ -29,21 +29,21 @@ public final class PolicyBuilderTest { @Test public void testDisaggregationPolicy() { final Policy disaggregationPolicy = new DisaggregationPolicy(); - assertEquals(13, disaggregationPolicy.getCompileTimePasses().size()); + assertEquals(15, disaggregationPolicy.getCompileTimePasses().size()); assertEquals(0, disaggregationPolicy.getRuntimePasses().size()); } @Test public void testPadoPolicy() { final Policy padoPolicy = new PadoPolicy(); - assertEquals(15, padoPolicy.getCompileTimePasses().size()); + assertEquals(17, padoPolicy.getCompileTimePasses().size()); assertEquals(0, padoPolicy.getRuntimePasses().size()); } @Test public void testDataSkewPolicy() { final Policy dataSkewPolicy = new DataSkewPolicy(); - assertEquals(17, dataSkewPolicy.getCompileTimePasses().size()); + assertEquals(19, dataSkewPolicy.getCompileTimePasses().size()); assertEquals(1, dataSkewPolicy.getRuntimePasses().size()); }