nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] 04/14: q6 success
Date Wed, 17 Oct 2018 01:13:21 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 68ab8efc4464aa3d77f91943bb91ee0f3ddde90e
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Sun Sep 9 18:57:01 2018 +0900

    q6 success
---
 .../compiler/frontend/beam/PipelineTranslator.java | 12 +++++++
 .../frontend/beam/transform/DoTransform.java       | 38 +++++++++++++++++++++-
 .../nemo/examples/beam/GenericSourceSink.java      |  2 +-
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   | 11 ++++---
 .../apache/nemo/examples/beam/SQLTpchITCase.java   |  7 ++--
 5 files changed, 60 insertions(+), 10 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 6ef96bf..67b8828 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -117,6 +117,18 @@ public final class PipelineTranslator
   private static void parDoSingleOutputTranslator(final TranslationContext ctx,
                                                   final PrimitiveTransformVertex transformVertex,
                                                   final ParDo.SingleOutput<?, ?> transform)
{
+    /*
+    final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    final Map<String, DoFnSignature.StateDeclaration> stateDeclarationMap = signature.stateDeclarations();
+    for (final DoFnSignature.StateDeclaration declaration : stateDeclarationMap.values())
{
+      final TypeDescriptor<? extends State> stateType = declaration.stateType();
+      if (stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) {
+        declaration.field()
+      } else {
+        throw new UnsupportedOperationException("Only ValueState is supported at the moment");
+      }
+    }
+    */
     final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
     final IRVertex vertex = new OperatorVertex(doTransform);
     ctx.addVertex(vertex);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
index 3a36ec0..1935370 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
@@ -16,6 +16,7 @@
 package org.apache.nemo.compiler.frontend.beam.transform;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
@@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -200,6 +203,37 @@ public final class DoTransform<I, O> implements Transform<I,
O> {
     private final ObjectMapper mapper;
     private final PipelineOptions options;
 
+    private final Map<String, TmpIntegerValueState> idToValueState;
+    private final Map<String, Integer> idToInteger;
+    private class TmpIntegerValueState implements ValueState<Integer> {
+      final String stateId;
+
+      public TmpIntegerValueState(final String stateId) {
+        this.stateId = stateId;
+      }
+
+      @Override
+      public void write(final Integer state) {
+        idToInteger.put(stateId, state);
+      }
+
+      @Nullable
+      @Override
+      public Integer read() {
+        return idToInteger.get(stateId);
+      }
+
+      @Override
+      public ValueState<Integer> readLater() {
+        return this;
+      }
+
+      @Override
+      public void clear() {
+        // Do nothing.
+      }
+    }
+
     /**
      * ProcessContext Constructor.
      *
@@ -222,6 +256,8 @@ public final class DoTransform<I, O> implements Transform<I,
O> {
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+      this.idToValueState = new HashMap<>(1);
+      this.idToInteger = new HashMap<>(1);
     }
 
     /**
@@ -369,7 +405,7 @@ public final class DoTransform<I, O> implements Transform<I,
O> {
 
     @Override
     public State state(final String stateId) {
-      throw new UnsupportedOperationException("state() in ProcessContext under DoTransform");
+      return idToValueState.computeIfAbsent(stateId, TmpIntegerValueState::new);
     }
 
     @Override
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 51fd3bd..5ac8b20 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * Helper class for handling source/sink in a generic way.
  * Assumes String-type PCollections.
  */
-final class GenericSourceSink {
+public final class GenericSourceSink {
   /**
    * Default Constructor.
    */
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 4f45920..ad93bd9 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -26,10 +26,12 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.*;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.examples.beam.GenericSourceSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -250,24 +252,25 @@ public final class Tpch {
     final Pipeline p = Pipeline.create(options);
 
     // Create tables
-    final CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
+    final CSVFormat csvFormat = CSVFormat.MYSQL
+      .withDelimiter('|')
+      .withNullString("")
+      .withTrailingDelimiter();
     final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
 
     // Run the TPC-H query
     final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId)));
 
-    /*
     final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
       new SerializableFunction<Row, String>() {
         @Override
-        public String apply(Row input) {
+        public String apply(final Row input) {
           System.out.println("row: " + input.getValues());
           return "row: " + input.getValues();
         }
       }));
 
     GenericSourceSink.write(resultToWrite, outputFilePath);
-    */
 
     // Then run
     p.run();
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
index 0b7668e..427d053 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
@@ -17,6 +17,7 @@ package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
 import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 import org.apache.nemo.examples.beam.tpch.Tpch;
 import org.junit.After;
@@ -36,8 +37,8 @@ public final class SQLTpchITCase {
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
 
-  private static final String outputFileName = "test_output_simplesql";
-  private static final String expectedOutputFileName = "expected_output_simplesql";
+  private static final String outputFileName = "test_output_tpch";
+  private static final String expectedOutputFileName = "expected_output_tpch";
   private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
   private static final String outputFilePath =  fileBasePath + outputFileName;
 
@@ -49,13 +50,11 @@ public final class SQLTpchITCase {
 
   @After
   public void tearDown() throws Exception {
-    /*
     try {
       ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
     } finally {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
-    */
   }
 
   @Test (timeout = TIMEOUT)


Mime
View raw message