beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] beam git commit: [BEAM-3027] Correctly set output type on SourceId-stripper
Date Mon, 09 Oct 2017 12:58:35 GMT
Repository: beam
Updated Branches:
  refs/heads/master b44056fc6 -> ec5e72403


[BEAM-3027] Correctly set output type on SourceId-stripper


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca20e69c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca20e69c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca20e69c

Branch: refs/heads/master
Commit: ca20e69ce1a817f63600e38a9d4450c2ac3bf949
Parents: b44056f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Oct 6 14:55:10 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 9 14:57:21 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkStreamingTransformTranslators.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca20e69c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 7cedb56..4d2166c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -188,9 +188,9 @@ class FlinkStreamingTransformTranslators {
         if (transform.getSource().requiresDeduping()) {
           source = nonDedupSource.keyBy(
               new ValueWithRecordIdKeySelector<T>())
-              .transform("debuping", outputTypeInfo, new DedupingOperator<T>());
+              .transform("deduping", outputTypeInfo, new DedupingOperator<T>());
         } else {
-          source = nonDedupSource.flatMap(new StripIdsMap<T>());
+          source = nonDedupSource.flatMap(new StripIdsMap<T>()).returns(outputTypeInfo);
         }
       } catch (Exception e) {
         throw new RuntimeException(


Mime
View raw message