beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/7] incubator-beam git commit: Merge branch 'master' into temp-option
Date Thu, 24 Mar 2016 20:42:46 GMT
Merge branch 'master' into temp-option


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/911d2953
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/911d2953
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/911d2953

Branch: refs/heads/master
Commit: 911d29539b8e586bd1452a6ec751155981b0f8f7
Parents: 8bc0659 9247ad7
Author: Pei He <peihe0@gmail.com>
Authored: Wed Mar 23 19:56:01 2016 -0700
Committer: Pei He <peihe0@gmail.com>
Committed: Wed Mar 23 19:56:01 2016 -0700

----------------------------------------------------------------------
 DISCLAIMER                                      |   10 +
 NOTICE                                          |   12 +
 README.md                                       |    9 +-
 examples/pom.xml                                |  129 +-
 .../examples/complete/AutoComplete.java         |   10 +-
 .../examples/MinimalWordCountJava8.java         |   68 --
 .../examples/complete/game/GameStats.java       |  347 ------
 .../examples/complete/game/HourlyTeamScore.java |  193 ---
 .../examples/complete/game/LeaderBoard.java     |  237 ----
 .../dataflow/examples/complete/game/README.md   |  119 --
 .../examples/complete/game/UserScore.java       |  239 ----
 .../complete/game/injector/Injector.java        |  417 -------
 .../complete/game/injector/InjectorUtils.java   |  101 --
 .../injector/RetryHttpInitializerWrapper.java   |  127 --
 .../complete/game/utils/WriteToBigQuery.java    |  134 ---
 .../game/utils/WriteWindowedToBigQuery.java     |   76 --
 .../examples/MinimalWordCountJava8Test.java     |  103 --
 .../examples/complete/game/GameStatsTest.java   |   99 --
 .../complete/game/HourlyTeamScoreTest.java      |  121 --
 .../examples/complete/game/UserScoreTest.java   |  156 ---
 java8examples/pom.xml                           |  278 +++++
 .../examples/MinimalWordCountJava8.java         |   68 ++
 .../examples/complete/game/GameStats.java       |  339 ++++++
 .../examples/complete/game/HourlyTeamScore.java |  193 +++
 .../examples/complete/game/LeaderBoard.java     |  237 ++++
 .../dataflow/examples/complete/game/README.md   |  113 ++
 .../examples/complete/game/UserScore.java       |  239 ++++
 .../complete/game/injector/Injector.java        |  415 +++++++
 .../complete/game/injector/InjectorUtils.java   |  101 ++
 .../injector/RetryHttpInitializerWrapper.java   |  126 ++
 .../complete/game/utils/WriteToBigQuery.java    |  134 +++
 .../game/utils/WriteWindowedToBigQuery.java     |   76 ++
 .../examples/MinimalWordCountJava8Test.java     |  103 ++
 .../examples/complete/game/GameStatsTest.java   |   76 ++
 .../complete/game/HourlyTeamScoreTest.java      |  111 ++
 .../examples/complete/game/UserScoreTest.java   |  154 +++
 java8tests/pom.xml                              |  183 +++
 .../sdk/transforms/CombineJava8Test.java        |  133 +++
 .../sdk/transforms/FilterJava8Test.java         |  118 ++
 .../transforms/FlatMapElementsJava8Test.java    |   84 ++
 .../sdk/transforms/MapElementsJava8Test.java    |   77 ++
 .../sdk/transforms/PartitionJava8Test.java      |   74 ++
 .../transforms/RemoveDuplicatesJava8Test.java   |   98 ++
 .../sdk/transforms/WithKeysJava8Test.java       |   73 ++
 .../sdk/transforms/WithTimestampsJava8Test.java |   65 ++
 maven-archetypes/examples/pom.xml               |    2 +-
 maven-archetypes/starter/pom.xml                |    2 +-
 pom.xml                                         |   24 +-
 runners/flink/README.md                         |  101 +-
 runners/flink/examples/pom.xml                  |   89 ++
 .../beam/runners/flink/examples/TFIDF.java      |  452 +++++++
 .../beam/runners/flink/examples/WordCount.java  |  113 ++
 .../flink/examples/streaming/AutoComplete.java  |  387 ++++++
 .../flink/examples/streaming/JoinExamples.java  |  158 +++
 .../KafkaWindowedWordCountExample.java          |  142 +++
 .../examples/streaming/WindowedWordCount.java   |  130 +++
 runners/flink/pom.xml                           |  416 +++----
 runners/flink/runner/pom.xml                    |  147 +++
 .../FlinkPipelineExecutionEnvironment.java      |  269 +++++
 .../runners/flink/FlinkPipelineOptions.java     |   93 ++
 .../beam/runners/flink/FlinkPipelineRunner.java |  198 ++++
 .../runners/flink/FlinkRunnerRegistrar.java     |   56 +
 .../beam/runners/flink/FlinkRunnerResult.java   |   68 ++
 .../apache/beam/runners/flink/io/ConsoleIO.java |   82 ++
 .../FlinkBatchPipelineTranslator.java           |  153 +++
 .../FlinkBatchTransformTranslators.java         |  594 ++++++++++
 .../FlinkBatchTranslationContext.java           |  129 ++
 .../translation/FlinkPipelineTranslator.java    |   36 +
 .../FlinkStreamingPipelineTranslator.java       |  150 +++
 .../FlinkStreamingTransformTranslators.java     |  407 +++++++
 .../FlinkStreamingTranslationContext.java       |   89 ++
 .../FlinkCoGroupKeyedListAggregator.java        |   60 +
 .../functions/FlinkCreateFunction.java          |   62 +
 .../functions/FlinkDoFnFunction.java            |  204 ++++
 .../FlinkKeyedListAggregationFunction.java      |   77 ++
 .../functions/FlinkMultiOutputDoFnFunction.java |  177 +++
 .../FlinkMultiOutputPruningFunction.java        |   43 +
 .../functions/FlinkPartialReduceFunction.java   |   60 +
 .../functions/FlinkReduceFunction.java          |   57 +
 .../flink/translation/functions/UnionCoder.java |  150 +++
 .../translation/types/CoderComparator.java      |  216 ++++
 .../translation/types/CoderTypeInformation.java |  116 ++
 .../translation/types/CoderTypeSerializer.java  |  152 +++
 .../types/InspectableByteArrayOutputStream.java |   34 +
 .../translation/types/KvCoderComperator.java    |  264 +++++
 .../types/KvCoderTypeInformation.java           |  186 +++
 .../types/VoidCoderTypeSerializer.java          |  112 ++
 .../wrappers/CombineFnAggregatorWrapper.java    |   92 ++
 .../wrappers/DataInputViewWrapper.java          |   59 +
 .../wrappers/DataOutputViewWrapper.java         |   52 +
 .../SerializableFnAggregatorWrapper.java        |   91 ++
 .../translation/wrappers/SinkOutputFormat.java  |  121 ++
 .../translation/wrappers/SourceInputFormat.java |  164 +++
 .../translation/wrappers/SourceInputSplit.java  |   52 +
 .../streaming/FlinkAbstractParDoWrapper.java    |  266 +++++
 .../FlinkGroupAlsoByWindowWrapper.java          |  640 ++++++++++
 .../streaming/FlinkGroupByKeyWrapper.java       |   66 ++
 .../streaming/FlinkParDoBoundMultiWrapper.java  |   77 ++
 .../streaming/FlinkParDoBoundWrapper.java       |  100 ++
 .../io/FlinkStreamingCreateFunction.java        |   65 ++
 .../streaming/io/UnboundedFlinkSource.java      |   82 ++
 .../streaming/io/UnboundedSocketSource.java     |  233 ++++
 .../streaming/io/UnboundedSourceWrapper.java    |  171 +++
 .../state/AbstractFlinkTimerInternals.java      |  128 ++
 .../streaming/state/FlinkStateInternals.java    |  715 ++++++++++++
 .../streaming/state/StateCheckpointReader.java  |   91 ++
 .../streaming/state/StateCheckpointUtils.java   |  155 +++
 .../streaming/state/StateCheckpointWriter.java  |  129 ++
 .../wrappers/streaming/state/StateType.java     |   73 ++
 .../runner/src/main/resources/log4j.properties  |   23 +
 .../apache/beam/runners/flink/AvroITCase.java   |  127 ++
 .../beam/runners/flink/FlattenizeITCase.java    |   74 ++
 .../runners/flink/FlinkRunnerRegistrarTest.java |   48 +
 .../beam/runners/flink/FlinkTestPipeline.java   |   72 ++
 .../beam/runners/flink/JoinExamplesITCase.java  |  101 ++
 .../runners/flink/MaybeEmptyTestITCase.java     |   65 ++
 .../runners/flink/ParDoMultiOutputITCase.java   |  100 ++
 .../beam/runners/flink/ReadSourceITCase.java    |  165 +++
 .../flink/RemoveDuplicatesEmptyITCase.java      |   70 ++
 .../runners/flink/RemoveDuplicatesITCase.java   |   71 ++
 .../beam/runners/flink/SideInputITCase.java     |   69 ++
 .../apache/beam/runners/flink/TfIdfITCase.java  |   78 ++
 .../beam/runners/flink/WordCountITCase.java     |   75 ++
 .../runners/flink/WordCountJoin2ITCase.java     |  138 +++
 .../runners/flink/WordCountJoin3ITCase.java     |  156 +++
 .../beam/runners/flink/WriteSinkITCase.java     |  158 +++
 .../flink/streaming/GroupAlsoByWindowTest.java  |  508 ++++++++
 .../flink/streaming/GroupByNullKeyTest.java     |  123 ++
 .../flink/streaming/StateSerializationTest.java |  305 +++++
 .../streaming/TopWikipediaSessionsITCase.java   |  134 +++
 .../flink/streaming/UnboundedSourceITCase.java  |  210 ++++
 .../beam/runners/flink/util/JoinExamples.java   |  160 +++
 .../src/test/resources/log4j-test.properties    |   27 +
 .../FlinkPipelineExecutionEnvironment.java      |  269 -----
 .../runners/flink/FlinkPipelineOptions.java     |   93 --
 .../beam/runners/flink/FlinkPipelineRunner.java |  206 ----
 .../beam/runners/flink/FlinkRunnerResult.java   |   68 --
 .../beam/runners/flink/examples/TFIDF.java      |  452 -------
 .../beam/runners/flink/examples/WordCount.java  |  113 --
 .../flink/examples/streaming/AutoComplete.java  |  387 ------
 .../flink/examples/streaming/JoinExamples.java  |  158 ---
 .../KafkaWindowedWordCountExample.java          |  143 ---
 .../examples/streaming/WindowedWordCount.java   |  130 ---
 .../apache/beam/runners/flink/io/ConsoleIO.java |   82 --
 .../FlinkBatchPipelineTranslator.java           |  153 ---
 .../FlinkBatchTransformTranslators.java         |  594 ----------
 .../FlinkBatchTranslationContext.java           |  129 --
 .../translation/FlinkPipelineTranslator.java    |   36 -
 .../FlinkStreamingPipelineTranslator.java       |  150 ---
 .../FlinkStreamingTransformTranslators.java     |  406 -------
 .../FlinkStreamingTranslationContext.java       |   89 --
 .../FlinkCoGroupKeyedListAggregator.java        |   60 -
 .../functions/FlinkCreateFunction.java          |   62 -
 .../functions/FlinkDoFnFunction.java            |  204 ----
 .../FlinkKeyedListAggregationFunction.java      |   77 --
 .../functions/FlinkMultiOutputDoFnFunction.java |  177 ---
 .../FlinkMultiOutputPruningFunction.java        |   43 -
 .../functions/FlinkPartialReduceFunction.java   |   60 -
 .../functions/FlinkReduceFunction.java          |   57 -
 .../flink/translation/functions/UnionCoder.java |  150 ---
 .../translation/types/CoderComparator.java      |  216 ----
 .../translation/types/CoderTypeInformation.java |  116 --
 .../translation/types/CoderTypeSerializer.java  |  152 ---
 .../types/InspectableByteArrayOutputStream.java |   34 -
 .../translation/types/KvCoderComperator.java    |  264 -----
 .../types/KvCoderTypeInformation.java           |  186 ---
 .../types/VoidCoderTypeSerializer.java          |  112 --
 .../wrappers/CombineFnAggregatorWrapper.java    |   92 --
 .../wrappers/DataInputViewWrapper.java          |   59 -
 .../wrappers/DataOutputViewWrapper.java         |   52 -
 .../SerializableFnAggregatorWrapper.java        |   91 --
 .../translation/wrappers/SinkOutputFormat.java  |  121 --
 .../translation/wrappers/SourceInputFormat.java |  164 ---
 .../translation/wrappers/SourceInputSplit.java  |   52 -
 .../streaming/FlinkAbstractParDoWrapper.java    |  266 -----
 .../FlinkGroupAlsoByWindowWrapper.java          |  640 ----------
 .../streaming/FlinkGroupByKeyWrapper.java       |   66 --
 .../streaming/FlinkParDoBoundMultiWrapper.java  |   77 --
 .../streaming/FlinkParDoBoundWrapper.java       |  100 --
 .../io/FlinkStreamingCreateFunction.java        |   65 --
 .../streaming/io/UnboundedFlinkSource.java      |   82 --
 .../streaming/io/UnboundedSocketSource.java     |  233 ----
 .../streaming/io/UnboundedSourceWrapper.java    |  134 ---
 .../state/AbstractFlinkTimerInternals.java      |  128 --
 .../streaming/state/FlinkStateInternals.java    |  715 ------------
 .../streaming/state/StateCheckpointReader.java  |   91 --
 .../streaming/state/StateCheckpointUtils.java   |  155 ---
 .../streaming/state/StateCheckpointWriter.java  |  129 --
 .../wrappers/streaming/state/StateType.java     |   73 --
 .../flink/src/main/resources/log4j.properties   |   23 -
 .../apache/beam/runners/flink/AvroITCase.java   |  127 --
 .../beam/runners/flink/FlattenizeITCase.java    |   74 --
 .../beam/runners/flink/FlinkTestPipeline.java   |   72 --
 .../beam/runners/flink/JoinExamplesITCase.java  |  101 --
 .../runners/flink/MaybeEmptyTestITCase.java     |   65 --
 .../runners/flink/ParDoMultiOutputITCase.java   |  100 --
 .../beam/runners/flink/ReadSourceITCase.java    |  165 ---
 .../flink/RemoveDuplicatesEmptyITCase.java      |   70 --
 .../runners/flink/RemoveDuplicatesITCase.java   |   71 --
 .../beam/runners/flink/SideInputITCase.java     |   69 --
 .../apache/beam/runners/flink/TfIdfITCase.java  |   78 --
 .../beam/runners/flink/WordCountITCase.java     |   76 --
 .../runners/flink/WordCountJoin2ITCase.java     |  138 ---
 .../runners/flink/WordCountJoin3ITCase.java     |  156 ---
 .../beam/runners/flink/WriteSinkITCase.java     |  158 ---
 .../flink/streaming/GroupAlsoByWindowTest.java  |  508 --------
 .../flink/streaming/GroupByNullKeyTest.java     |  123 --
 .../flink/streaming/StateSerializationTest.java |  305 -----
 .../streaming/TopWikipediaSessionsITCase.java   |  134 ---
 .../beam/runners/flink/util/JoinExamples.java   |  160 ---
 .../src/test/resources/log4j-test.properties    |   27 -
 runners/pom.xml                                 |   63 +-
 runners/spark/.gitignore                        |   10 -
 runners/spark/.travis.yml                       |   22 -
 runners/spark/README.md                         |  112 +-
 runners/spark/build-resources/checkstyle.xml    |   27 +-
 runners/spark/build-resources/header-file.txt   |   23 +-
 runners/spark/pom.xml                           |  246 ++--
 .../com/cloudera/dataflow/hadoop/HadoopIO.java  |  202 ----
 .../dataflow/hadoop/NullWritableCoder.java      |   71 --
 .../cloudera/dataflow/hadoop/WritableCoder.java |  120 --
 .../com/cloudera/dataflow/io/ConsoleIO.java     |   60 -
 .../com/cloudera/dataflow/io/CreateStream.java  |   66 --
 .../java/com/cloudera/dataflow/io/KafkaIO.java  |  128 --
 .../dataflow/spark/BroadcastHelper.java         |  121 --
 .../com/cloudera/dataflow/spark/ByteArray.java  |   52 -
 .../cloudera/dataflow/spark/CoderHelpers.java   |  185 ---
 .../cloudera/dataflow/spark/DoFnFunction.java   |   93 --
 .../dataflow/spark/EvaluationContext.java       |  283 -----
 .../dataflow/spark/EvaluationResult.java        |   62 -
 .../dataflow/spark/MultiDoFnFunction.java       |  115 --
 .../dataflow/spark/ShardNameBuilder.java        |  106 --
 .../dataflow/spark/ShardNameTemplateAware.java  |   28 -
 .../dataflow/spark/ShardNameTemplateHelper.java |   58 -
 .../dataflow/spark/SparkContextFactory.java     |   66 --
 .../dataflow/spark/SparkPipelineEvaluator.java  |   52 -
 .../dataflow/spark/SparkPipelineOptions.java    |   39 -
 .../spark/SparkPipelineOptionsFactory.java      |   27 -
 .../spark/SparkPipelineOptionsRegistrar.java    |   27 -
 .../dataflow/spark/SparkPipelineRunner.java     |  252 ----
 .../spark/SparkPipelineRunnerRegistrar.java     |   27 -
 .../dataflow/spark/SparkPipelineTranslator.java |   27 -
 .../dataflow/spark/SparkProcessContext.java     |  250 ----
 .../dataflow/spark/SparkRuntimeContext.java     |  212 ----
 .../spark/TemplatedAvroKeyOutputFormat.java     |   40 -
 .../TemplatedSequenceFileOutputFormat.java      |   40 -
 .../spark/TemplatedTextOutputFormat.java        |   40 -
 .../dataflow/spark/TransformEvaluator.java      |   24 -
 .../dataflow/spark/TransformTranslator.java     |  800 -------------
 .../dataflow/spark/WindowingHelpers.java        |   59 -
 .../spark/aggregators/AggAccumParam.java        |   35 -
 .../spark/aggregators/NamedAggregators.java     |  202 ----
 .../SparkStreamingPipelineOptions.java          |   40 -
 .../SparkStreamingPipelineOptionsFactory.java   |   27 -
 .../SparkStreamingPipelineOptionsRegistrar.java |   28 -
 .../streaming/StreamingEvaluationContext.java   |  226 ----
 .../streaming/StreamingTransformTranslator.java |  414 -------
 .../StreamingWindowPipelineDetector.java        |  100 --
 .../beam/runners/spark/EvaluationResult.java    |   65 ++
 .../runners/spark/SparkPipelineOptions.java     |   42 +
 .../beam/runners/spark/SparkPipelineRunner.java |  255 ++++
 .../spark/SparkStreamingPipelineOptions.java    |   41 +
 .../spark/aggregators/AggAccumParam.java        |   38 +
 .../spark/aggregators/NamedAggregators.java     |  205 ++++
 .../beam/runners/spark/coders/CoderHelpers.java |  189 +++
 .../runners/spark/coders/NullWritableCoder.java |   74 ++
 .../runners/spark/coders/WritableCoder.java     |  123 ++
 .../apache/beam/runners/spark/io/ConsoleIO.java |   63 +
 .../beam/runners/spark/io/CreateStream.java     |   69 ++
 .../apache/beam/runners/spark/io/KafkaIO.java   |  131 +++
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |  203 ++++
 .../spark/io/hadoop/ShardNameBuilder.java       |  109 ++
 .../spark/io/hadoop/ShardNameTemplateAware.java |   31 +
 .../io/hadoop/ShardNameTemplateHelper.java      |   61 +
 .../io/hadoop/TemplatedAvroKeyOutputFormat.java |   43 +
 .../TemplatedSequenceFileOutputFormat.java      |   43 +
 .../io/hadoop/TemplatedTextOutputFormat.java    |   43 +
 .../runners/spark/translation/DoFnFunction.java |   97 ++
 .../spark/translation/EvaluationContext.java    |  288 +++++
 .../spark/translation/MultiDoFnFunction.java    |  119 ++
 .../spark/translation/SparkContextFactory.java  |   69 ++
 .../translation/SparkPipelineEvaluator.java     |   56 +
 .../SparkPipelineOptionsFactory.java            |   31 +
 .../SparkPipelineOptionsRegistrar.java          |   31 +
 .../SparkPipelineRunnerRegistrar.java           |   31 +
 .../translation/SparkPipelineTranslator.java    |   30 +
 .../spark/translation/SparkProcessContext.java  |  262 +++++
 .../spark/translation/SparkRuntimeContext.java  |  217 ++++
 .../spark/translation/TransformEvaluator.java   |   27 +
 .../spark/translation/TransformTranslator.java  |  808 +++++++++++++
 .../spark/translation/WindowingHelpers.java     |   62 +
 .../SparkStreamingPipelineOptionsFactory.java   |   31 +
 .../SparkStreamingPipelineOptionsRegistrar.java |   32 +
 .../streaming/StreamingEvaluationContext.java   |  229 ++++
 .../streaming/StreamingTransformTranslator.java |  418 +++++++
 .../StreamingWindowPipelineDetector.java        |  104 ++
 .../runners/spark/util/BroadcastHelper.java     |  125 ++
 .../beam/runners/spark/util/ByteArray.java      |   55 +
 ...ataflow.sdk.options.PipelineOptionsRegistrar |    4 +-
 ...dataflow.sdk.runners.PipelineRunnerRegistrar |    2 +-
 .../dataflow/hadoop/WritableCoderTest.java      |   42 -
 .../dataflow/spark/AvroPipelineTest.java        |  103 --
 .../dataflow/spark/CombineGloballyTest.java     |   87 --
 .../dataflow/spark/CombinePerKeyTest.java       |   69 --
 .../com/cloudera/dataflow/spark/DeDupTest.java  |   55 -
 .../cloudera/dataflow/spark/DoFnOutputTest.java |   57 -
 .../cloudera/dataflow/spark/EmptyInputTest.java |   64 -
 .../spark/HadoopFileFormatPipelineTest.java     |  105 --
 .../spark/MultiOutputWordCountTest.java         |  148 ---
 .../cloudera/dataflow/spark/NumShardsTest.java  |   89 --
 .../dataflow/spark/SerializationTest.java       |  183 ---
 .../dataflow/spark/ShardNameBuilderTest.java    |   82 --
 .../dataflow/spark/SideEffectsTest.java         |   77 --
 .../dataflow/spark/SimpleWordCountTest.java     |  117 --
 .../spark/TestSparkPipelineOptionsFactory.java  |   34 -
 .../com/cloudera/dataflow/spark/TfIdfTest.java  |   60 -
 .../dataflow/spark/TransformTranslatorTest.java |   95 --
 .../dataflow/spark/WindowedWordCountTest.java   |   63 -
 .../spark/streaming/FlattenStreamingTest.java   |   84 --
 .../spark/streaming/KafkaStreamingTest.java     |  133 ---
 .../streaming/SimpleStreamingWordCountTest.java |   73 --
 .../utils/DataflowAssertStreaming.java          |   39 -
 .../streaming/utils/EmbeddedKafkaCluster.java   |  314 -----
 .../apache/beam/runners/spark/DeDupTest.java    |   60 +
 .../beam/runners/spark/EmptyInputTest.java      |   69 ++
 .../beam/runners/spark/SimpleWordCountTest.java |  115 ++
 .../apache/beam/runners/spark/TfIdfTest.java    |   64 +
 .../runners/spark/coders/WritableCoderTest.java |   45 +
 .../beam/runners/spark/io/AvroPipelineTest.java |  108 ++
 .../beam/runners/spark/io/NumShardsTest.java    |   96 ++
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  113 ++
 .../spark/io/hadoop/ShardNameBuilderTest.java   |   85 ++
 .../spark/translation/CombineGloballyTest.java  |   94 ++
 .../spark/translation/CombinePerKeyTest.java    |   70 ++
 .../spark/translation/DoFnOutputTest.java       |   64 +
 .../translation/MultiOutputWordCountTest.java   |  137 +++
 .../spark/translation/SerializationTest.java    |  183 +++
 .../spark/translation/SideEffectsTest.java      |   81 ++
 .../TestSparkPipelineOptionsFactory.java        |   38 +
 .../translation/TransformTranslatorTest.java    |   99 ++
 .../translation/WindowedWordCountTest.java      |   71 ++
 .../streaming/FlattenStreamingTest.java         |   88 ++
 .../streaming/KafkaStreamingTest.java           |  140 +++
 .../streaming/SimpleStreamingWordCountTest.java |   77 ++
 .../utils/DataflowAssertStreaming.java          |   42 +
 .../streaming/utils/EmbeddedKafkaCluster.java   |  317 +++++
 sdk/pom.xml                                     |   82 +-
 .../sdk/coders/protobuf/package-info.java       |   23 +
 .../dataflow/sdk/io/bigtable/BigtableIO.java    |    4 +-
 .../dataflow/sdk/io/bigtable/package-info.java  |   22 +
 .../dataflow/sdk/options/PipelineOptions.java   |    3 +-
 .../sdk/options/PipelineOptionsFactory.java     |   72 +-
 .../sdk/runners/DataflowPipelineRunner.java     |   25 +-
 .../sdk/runners/DataflowPipelineTranslator.java |   15 +
 .../inprocess/BoundedReadEvaluatorFactory.java  |   50 +-
 .../CachedThreadPoolExecutorServiceFactory.java |   42 +
 .../runners/inprocess/CompletionCallback.java   |   33 +
 .../ConsumerTrackingPipelineVisitor.java        |  173 +++
 .../sdk/runners/inprocess/EvaluatorKey.java     |    1 -
 .../inprocess/ExecutorServiceFactory.java       |   32 +
 .../ExecutorServiceParallelExecutor.java        |  432 +++++++
 .../inprocess/FlattenEvaluatorFactory.java      |    7 +-
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   10 +-
 .../inprocess/InMemoryWatermarkManager.java     |   30 +-
 .../sdk/runners/inprocess/InProcessBundle.java  |   20 +-
 .../inprocess/InProcessEvaluationContext.java   |  405 +++++++
 .../runners/inprocess/InProcessExecutor.java    |   46 +
 .../inprocess/InProcessPipelineOptions.java     |   68 +-
 .../inprocess/InProcessPipelineRunner.java      |  319 +++--
 .../inprocess/InProcessSideInputContainer.java  |   71 +-
 .../inprocess/KeyedPValueTrackingVisitor.java   |   95 ++
 .../inprocess/ParDoMultiEvaluatorFactory.java   |    6 +-
 .../inprocess/ParDoSingleEvaluatorFactory.java  |    6 +-
 .../sdk/runners/inprocess/StepAndKey.java       |   68 ++
 .../inprocess/TransformEvaluatorFactory.java    |    1 -
 .../inprocess/TransformEvaluatorRegistry.java   |   72 ++
 .../runners/inprocess/TransformExecutor.java    |  114 ++
 .../inprocess/TransformExecutorService.java     |   34 +
 .../inprocess/TransformExecutorServices.java    |  153 +++
 .../UnboundedReadEvaluatorFactory.java          |   54 +-
 .../runners/inprocess/ViewEvaluatorFactory.java |    8 +-
 .../inprocess/WatermarkCallbackExecutor.java    |  143 +++
 .../cloud/dataflow/sdk/transforms/Combine.java  |   18 +-
 .../dataflow/sdk/transforms/CombineFns.java     | 1100 ++++++++++++++++++
 .../cloud/dataflow/sdk/transforms/DoFn.java     |   13 +-
 .../dataflow/sdk/transforms/DoFnReflector.java  |    7 +-
 .../dataflow/sdk/transforms/PTransform.java     |   14 +-
 .../cloud/dataflow/sdk/transforms/ParDo.java    |   13 +
 .../sdk/transforms/display/DisplayData.java     |  530 +++++++++
 .../sdk/transforms/display/HasDisplayData.java  |   53 +
 .../transforms/windowing/AfterWatermark.java    |    4 +-
 .../cloud/dataflow/sdk/util/DoFnRunners.java    |    4 +-
 .../cloud/dataflow/sdk/util/PropertyNames.java  |    2 +
 .../sdk/options/PipelineOptionsFactoryTest.java |   75 +-
 .../runners/DataflowPipelineTranslatorTest.java |   98 +-
 .../BoundedReadEvaluatorFactoryTest.java        |  138 ++-
 .../ConsumerTrackingPipelineVisitorTest.java    |  233 ++++
 .../inprocess/FlattenEvaluatorFactoryTest.java  |    1 -
 .../GroupByKeyEvaluatorFactoryTest.java         |    1 -
 .../inprocess/InMemoryWatermarkManagerTest.java |   12 +
 .../InProcessEvaluationContextTest.java         |  544 +++++++++
 .../inprocess/InProcessPipelineRunnerTest.java  |   77 ++
 .../InProcessSideInputContainerTest.java        |   92 +-
 .../KeyedPValueTrackingVisitorTest.java         |  189 +++
 .../ParDoMultiEvaluatorFactoryTest.java         |    1 -
 .../ParDoSingleEvaluatorFactoryTest.java        |    1 -
 .../TransformExecutorServicesTest.java          |  134 +++
 .../inprocess/TransformExecutorTest.java        |  312 +++++
 .../UnboundedReadEvaluatorFactoryTest.java      |  169 ++-
 .../inprocess/ViewEvaluatorFactoryTest.java     |    1 -
 .../WatermarkCallbackExecutorTest.java          |  126 ++
 .../dataflow/sdk/transforms/CombineFnsTest.java |  413 +++++++
 .../cloud/dataflow/sdk/transforms/DoFnTest.java |   15 +
 .../dataflow/sdk/transforms/PTransformTest.java |   41 +
 .../dataflow/sdk/transforms/ParDoTest.java      |   23 +
 .../transforms/display/DisplayDataMatchers.java |   98 ++
 .../display/DisplayDataMatchersTest.java        |   81 ++
 .../sdk/transforms/display/DisplayDataTest.java |  633 ++++++++++
 .../cloud/dataflow/sdk/util/ApiSurfaceTest.java |    3 +-
 .../PipelineOptionsFactoryJava8Test.java        |   90 ++
 .../sdk/transforms/CombineJava8Test.java        |  133 ---
 .../sdk/transforms/FilterJava8Test.java         |  118 --
 .../transforms/FlatMapElementsJava8Test.java    |   84 --
 .../sdk/transforms/MapElementsJava8Test.java    |   77 --
 .../sdk/transforms/PartitionJava8Test.java      |   74 --
 .../transforms/RemoveDuplicatesJava8Test.java   |   99 --
 .../sdk/transforms/WithKeysJava8Test.java       |   74 --
 .../sdk/transforms/WithTimestampsJava8Test.java |   66 --
 428 files changed, 31664 insertions(+), 23375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --cc runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0000000,e115a15..b413d7a
mode 000000,100644..100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@@ -1,0 -1,631 +1,640 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.beam.runners.flink.translation.wrappers.streaming;
+ 
+ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+ import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*;
+ import com.google.cloud.dataflow.sdk.coders.*;
+ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+ import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+ import com.google.cloud.dataflow.sdk.transforms.Combine;
+ import com.google.cloud.dataflow.sdk.transforms.DoFn;
+ import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+ import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
+ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+ import com.google.cloud.dataflow.sdk.util.*;
+ import com.google.cloud.dataflow.sdk.values.*;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.Multimap;
+ import org.apache.flink.api.common.accumulators.Accumulator;
+ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+ import org.apache.flink.core.memory.DataInputView;
+ import org.apache.flink.runtime.state.AbstractStateBackend;
+ import org.apache.flink.runtime.state.StateHandle;
+ import org.apache.flink.streaming.api.datastream.DataStream;
+ import org.apache.flink.streaming.api.datastream.KeyedStream;
+ import org.apache.flink.streaming.api.operators.*;
+ import org.apache.flink.streaming.api.watermark.Watermark;
+ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+ import org.joda.time.Instant;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ 
+ /**
+  * This class is the key class implementing all the windowing/triggering logic of Apache Beam.
+  * To provide full compatibility and support for all the windowing/triggering combinations offered by
+  * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
+  * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}.
+  * <p/>
+  * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
+  * grouped by key</b>. Each of the elements that enter here, registers a timer
+  * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the
+  * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}.
+  * This is essentially a timestamp indicating when to trigger the computation over the window this
+  * element belongs to.
+  * <p/>
+  * When a watermark arrives, all the registered timers are checked to see which ones are ready to
+  * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
+  * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
+  * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}
+  * for furhter processing.
+  */
+ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
+     extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
+     implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
+ 
+   private static final long serialVersionUID = 1L;
+ 
+   private transient PipelineOptions options;
+ 
+   private transient CoderRegistry coderRegistry;
+ 
+   private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+ 
+   private ProcessContext context;
+ 
+   private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
+ 
+   private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
+ 
+   private final KvCoder<K, VIN> inputKvCoder;
+ 
+   /**
+    * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
+    * key whose elements are currently waiting to be processed, and its associated state.
+    */
+   private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
+ 
+   /**
+    * Timers waiting to be processed.
+    */
+   private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+ 
+   private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
+ 
+   /**
+    * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+    * This method assumes that <b>elements are already grouped by key</b>.
+    * <p/>
+    * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
+    * is that this method assumes that a combiner function is provided
+    * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+    * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
+    *
+    * @param options            the general job configuration options.
+    * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+    * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+    * @param combiner           the combiner to be used.
+    * @param outputKvCoder      the type of the output values.
+    */
+   public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
+       PipelineOptions options,
+       PCollection input,
+       KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
+       Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
+       KvCoder<K, VOUT> outputKvCoder) {
+     Preconditions.checkNotNull(options);
+ 
+     KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+     FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
+         input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
+ 
+     Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+         outputKvCoder,
+         input.getWindowingStrategy().getWindowFn().windowCoder());
+ 
+     CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
+         new CoderTypeInformation<>(windowedOutputElemCoder);
+ 
+     DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
+         .transform("GroupByWindowWithCombiner",
+             new CoderTypeInformation<>(outputKvCoder),
+             windower)
+         .returns(outputTypeInfo);
+ 
+     return groupedByKeyAndWindow;
+   }
+ 
+   /**
+    * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+    * This method assumes that <b>elements are already grouped by key</b>.
+    * <p/>
+    * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
+    * is that this method assumes no combiner function
+    * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+    *
+    * @param options            the general job configuration options.
+    * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+    * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+    */
+   public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
+       PipelineOptions options,
+       PCollection input,
+       KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
+     Preconditions.checkNotNull(options);
+ 
+     KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+     Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+     Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+ 
+     FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
+         input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
+ 
+     Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
+     KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
+ 
+     Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+         outputElemCoder,
+         input.getWindowingStrategy().getWindowFn().windowCoder());
+ 
+     CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
+         new CoderTypeInformation<>(windowedOutputElemCoder);
+ 
+     DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
+         .transform("GroupByWindow",
+             new CoderTypeInformation<>(windowedOutputElemCoder),
+             windower)
+         .returns(outputTypeInfo);
+ 
+     return groupedByKeyAndWindow;
+   }
+ 
+   public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
+   createForTesting(PipelineOptions options,
+                    CoderRegistry registry,
+                    WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                    KvCoder<K, VIN> inputCoder,
+                    Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+     Preconditions.checkNotNull(options);
+ 
+     return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
+   }
+ 
+   private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
+                                         CoderRegistry registry,
+                                         WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                                         KvCoder<K, VIN> inputCoder,
+                                         Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+     Preconditions.checkNotNull(options);
+ 
+     this.options = Preconditions.checkNotNull(options);
+     this.coderRegistry = Preconditions.checkNotNull(registry);
+     this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
+     this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+     this.combineFn = combiner;
+     this.operator = createGroupAlsoByWindowOperator();
+     this.chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+ 
+   @Override
+   public void open() throws Exception {
+     super.open();
+     this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
+   }
+ 
+   /**
+    * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
+    * <b> if not already created</b>.
+    * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
+    * a function with that combiner is created, so that elements are combined as they arrive. This is
+    * done for speed and (in most of the cases) for reduction of the per-window state.
+    */
+   private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+     if (this.operator == null) {
+       if (this.combineFn == null) {
+         // Thus VOUT == Iterable<VIN>
+         Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+ 
+         this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+             (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+       } else {
+         Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+ 
+         AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
+             .withInputCoder(combineFn, coderRegistry, inputKvCoder);
+ 
+         this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
+             (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+       }
+     }
+     return this.operator;
+   }
+ 
+   private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
+     context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+ 
+     // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
+     operator.startBundle(context);
+     operator.processElement(context);
+     operator.finishBundle(context);
+   }
+ 
+   @Override
+   public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
+     ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
+     elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
+         element.getValue().getWindows(), element.getValue().getPane()));
+     processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
+   }
+ 
+   @Override
+   public void processWatermark(Watermark mark) throws Exception {
+     context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
+ 
+     Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+     if (!timers.isEmpty()) {
+       for (K key : timers.keySet()) {
+         processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
+       }
+     }
+ 
+     /**
+      * This is to take into account the different semantics of the Watermark in Flink and
+      * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
+      * watermark holding logic, see the documentation of
+      * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
+      * */
+     long millis = Long.MAX_VALUE;
+     for (FlinkStateInternals state : perKeyStateInternals.values()) {
+       Instant watermarkHold = state.getWatermarkHold();
+       if (watermarkHold != null && watermarkHold.getMillis() < millis) {
+         millis = watermarkHold.getMillis();
+       }
+     }
+ 
+     if (mark.getTimestamp() < millis) {
+       millis = mark.getTimestamp();
+     }
+ 
+     context.setCurrentOutputWatermark(new Instant(millis));
+ 
+     // Don't forget to re-emit the watermark for further operators down the line.
+     // This is critical for jobs with multiple aggregation steps.
+     // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
+     // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
+     // is not re-emitted, the second aggregation would never be triggered, and no result
+     // will be produced.
+     output.emitWatermark(new Watermark(millis));
+   }
+ 
+   @Override
+   public void close() throws Exception {
+     super.close();
+   }
+ 
+   private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+     Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+     if (timersForKey == null) {
+       timersForKey = new HashSet<>();
+     }
+     timersForKey.add(timer);
+     activeTimers.put(key, timersForKey);
+   }
+ 
+   private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+     Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+     if (timersForKey != null) {
+       timersForKey.remove(timer);
+       if (timersForKey.isEmpty()) {
+         activeTimers.remove(key);
+       } else {
+         activeTimers.put(key, timersForKey);
+       }
+     }
+   }
+ 
+   /**
+    * Returns the list of timers that are ready to fire. These are the timers
+    * that are registered to be triggered at a time before the current watermark.
+    * We keep these timers in a Set, so that they are deduplicated, as the same
+    * timer can be registered multiple times.
+    */
+   private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+ 
+     // we keep the timers to return in a different list and launch them later
+     // because we cannot prevent a trigger from registering another trigger,
+     // which would lead to concurrent modification exception.
+     Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+ 
+     Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+     while (it.hasNext()) {
+       Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+ 
+       Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+       while (timerIt.hasNext()) {
+         TimerInternals.TimerData timerData = timerIt.next();
+         if (timerData.getTimestamp().isBefore(currentWatermark)) {
+           toFire.put(keyWithTimers.getKey(), timerData);
+           timerIt.remove();
+         }
+       }
+ 
+       if (keyWithTimers.getValue().isEmpty()) {
+         it.remove();
+       }
+     }
+     return toFire;
+   }
+ 
+   /**
+    * Gets the state associated with the specified key.
+    *
+    * @param key the key whose state we want.
+    * @return The {@link FlinkStateInternals}
+    * associated with that key.
+    */
+   private FlinkStateInternals<K> getStateInternalsForKey(K key) {
+     FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
+     if (stateInternals == null) {
+       Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+       OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+       stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
+       perKeyStateInternals.put(key, stateInternals);
+     }
+     return stateInternals;
+   }
+ 
+   private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
+     @Override
+     public void setTimer(TimerData timerKey) {
+       registerActiveTimer(context.element().key(), timerKey);
+     }
+ 
+     @Override
+     public void deleteTimer(TimerData timerKey) {
+       unregisterActiveTimer(context.element().key(), timerKey);
+     }
+   }
+ 
+   private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
+ 
+     private final FlinkTimerInternals timerInternals;
+ 
+     private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
+ 
+     private FlinkStateInternals<K> stateInternals;
+ 
+     private KeyedWorkItem<K, VIN> element;
+ 
+     public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+                           TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
+                           FlinkTimerInternals timerInternals) {
+       function.super();
+       super.setupDelegateAggregators();
+ 
+       this.collector = Preconditions.checkNotNull(outCollector);
+       this.timerInternals = Preconditions.checkNotNull(timerInternals);
+     }
+ 
+     public void setElement(KeyedWorkItem<K, VIN> element,
+                            FlinkStateInternals<K> stateForKey) {
+       this.element = element;
+       this.stateInternals = stateForKey;
+     }
+ 
+     public void setCurrentInputWatermark(Instant watermark) {
+       this.timerInternals.setCurrentInputWatermark(watermark);
+     }
+ 
+     public void setCurrentOutputWatermark(Instant watermark) {
+       this.timerInternals.setCurrentOutputWatermark(watermark);
+     }
+ 
+     @Override
+     public KeyedWorkItem<K, VIN> element() {
+       return this.element;
+     }
+ 
+     @Override
+     public Instant timestamp() {
+       throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+     }
+ 
+     @Override
+     public PipelineOptions getPipelineOptions() {
+       // TODO: PipelineOptions need to be available on the workers.
+       // Ideally they are captured as part of the pipeline.
+       // For now, construct empty options so that StateContexts.createFromComponents
+       // will yield a valid StateContext, which is needed to support the StateContext.window().
+       if (options == null) {
+         options = new PipelineOptions() {
+           @Override
+           public <T extends PipelineOptions> T as(Class<T> kls) {
+             return null;
+           }
+ 
+           @Override
+           public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
+             return null;
+           }
+ 
+           @Override
+           public Class<? extends PipelineRunner<?>> getRunner() {
+             return null;
+           }
+ 
+           @Override
+           public void setRunner(Class<? extends PipelineRunner<?>> kls) {
+ 
+           }
+ 
+           @Override
+           public CheckEnabled getStableUniqueNames() {
+             return null;
+           }
+ 
+           @Override
+           public void setStableUniqueNames(CheckEnabled enabled) {
+           }
++
++          @Override
++          public String getTempLocation() {
++            return null;
++          }
++
++          @Override
++          public void setTempLocation(String tempLocation) {
++          }
+         };
+       }
+       return options;
+     }
+ 
+     @Override
+     public void output(KV<K, VOUT> output) {
+       throw new UnsupportedOperationException(
+           "output() is not available when processing KeyedWorkItems.");
+     }
+ 
+     @Override
+     public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
+       throw new UnsupportedOperationException(
+           "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+     }
+ 
+     @Override
+     public PaneInfo pane() {
+       throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+     }
+ 
+     @Override
+     public BoundedWindow window() {
+       throw new UnsupportedOperationException(
+           "window() is not available when processing KeyedWorkItems.");
+     }
+ 
+     @Override
+     public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
+       return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
+ 
+         @Override
+         public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
+           return stateInternals;
+         }
+ 
+         @Override
+         public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+           // TODO: No need to represent timestamp twice.
+           collector.setAbsoluteTimestamp(timestamp.getMillis());
+           collector.collect(WindowedValue.of(output, timestamp, windows, pane));
+ 
+         }
+ 
+         @Override
+         public TimerInternals timerInternals() {
+           return timerInternals;
+         }
+ 
+         @Override
+         public Collection<? extends BoundedWindow> windows() {
+           throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+         }
+ 
+         @Override
+         public PaneInfo pane() {
+           throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+         }
+ 
+         @Override
+         public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+           throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+         }
+ 
+         @Override
+         public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+           throw new RuntimeException("sideInput() is not available in Streaming mode.");
+         }
+       };
+     }
+ 
+     @Override
+     public <T> T sideInput(PCollectionView<T> view) {
+       throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+     }
+ 
+     @Override
+     public <T> void sideOutput(TupleTag<T> tag, T output) {
+       // ignore the side output, this can happen when a user does not register
+       // side outputs but then outputs using a freshly created TupleTag.
+       throw new RuntimeException("sideOutput() is not available when grouping by window.");
+     }
+ 
+     @Override
+     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+       sideOutput(tag, output);
+     }
+ 
+     @Override
+     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+       Accumulator acc = getRuntimeContext().getAccumulator(name);
+       if (acc != null) {
+         AccumulatorHelper.compareAccumulatorTypes(name,
+             SerializableFnAggregatorWrapper.class, acc.getClass());
+         return (Aggregator<AggInputT, AggOutputT>) acc;
+       }
+ 
+       SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+           new SerializableFnAggregatorWrapper<>(combiner);
+       getRuntimeContext().addAccumulator(name, accumulator);
+       return accumulator;
+     }
+   }
+ 
+   //////////////        Checkpointing implementation        ////////////////
+ 
+   @Override
+   public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+     StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+     AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+     StateCheckpointWriter writer = StateCheckpointWriter.create(out);
+     Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ 
+     // checkpoint the timers
+     StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
+ 
+     // checkpoint the state
+     StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
+ 
+     // checkpoint the timerInternals
+     context.timerInternals.encodeTimerInternals(context, writer,
+         inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
+ 
+     taskState.setOperatorState(out.closeAndGetHandle());
+     return taskState;
+   }
+ 
+   @Override
+   public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
+     super.restoreState(taskState, recoveryTimestamp);
+ 
+     final ClassLoader userClassloader = getUserCodeClassloader();
+ 
+     Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+     Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ 
+     @SuppressWarnings("unchecked")
+     StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+     DataInputView in = inputState.getState(userClassloader);
+     StateCheckpointReader reader = new StateCheckpointReader(in);
+ 
+     // restore the timers
+     this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+ 
+     // restore the state
+     this.perKeyStateInternals = StateCheckpointUtils.decodeState(
+         reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
+ 
+     // restore the timerInternals.
+     this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
+   }
 -}
++}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------


Mime
View raw message