flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [hotfix] [docs] Fix Scala code snippets in docs.
Date Thu, 04 Jan 2018 13:49:32 GMT
[hotfix] [docs] Fix Scala code snippets in docs.

* remove unneeded semi-colons
* add `()` to `print` method
    * typically, methods with some side-effects are invoked with `()`
* fix a few misc issues

This closes #5221.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff8b2098
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff8b2098
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff8b2098

Branch: refs/heads/release-1.4
Commit: ff8b2098b206f5e95fadde3dec5f1b09df95f9bb
Parents: 5cf0f57
Author: okumin <mail@okumin.com>
Authored: Mon Jan 1 15:25:54 2018 +0900
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Jan 4 14:49:01 2018 +0100

----------------------------------------------------------------------
 docs/dev/batch/index.md                       | 20 +++----
 docs/dev/connectors/elasticsearch.md          |  6 +-
 docs/dev/connectors/kafka.md                  | 24 ++++----
 docs/dev/connectors/kinesis.md                | 52 ++++++++---------
 docs/dev/connectors/twitter.md                | 12 ++--
 docs/dev/datastream_api.md                    |  2 +-
 docs/dev/event_timestamps_watermarks.md       | 12 ++--
 docs/dev/libs/cep.md                          | 30 +++++-----
 docs/dev/libs/gelly/graph_api.md              |  2 +-
 docs/dev/libs/gelly/library_methods.md        |  2 +-
 docs/dev/stream/operators/asyncio.md          |  2 +-
 docs/dev/stream/state/custom_serialization.md |  2 +-
 docs/dev/stream/testing.md                    |  4 +-
 docs/dev/table/tableApi.md                    | 68 +++++++++++-----------
 docs/dev/table/udfs.md                        | 14 ++---
 15 files changed, 126 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index 7fb84e8..cb3b42c 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -571,7 +571,7 @@ data.reduceGroup { elements => elements.sum }
         data set.</p>
 {% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
-val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2);
+val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
 {% endhighlight %}
   <p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
 {% highlight scala %}
@@ -1037,7 +1037,7 @@ val csvInput = env.readCsvFile[Person](
 val values = env.fromElements("Foo", "bar", "foobar", "fubar")
 
 // generate a number sequence
-val numbers = env.generateSequence(1, 10000000);
+val numbers = env.generateSequence(1, 10000000)
 
 // read a file from the specified path of type TextInputFormat
 val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
@@ -1288,7 +1288,7 @@ val values: DataSet[(String, Int, Double)] = // [...]
 values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
 
 // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
-values.writeAsText("file:///path/to/the/result/file");
+values.writeAsText("file:///path/to/the/result/file")
 
 // this writes values as strings using a user-defined formatting
 values map { tuple => tuple._1 + " - " + tuple._2 }
@@ -1309,19 +1309,19 @@ val pData: DataSet[(BookPojo, Double)] = // [...]
 val sData: DataSet[String] = // [...]
 
 // sort output on String field in ascending order
-tData.sortPartition(1, Order.ASCENDING).print;
+tData.sortPartition(1, Order.ASCENDING).print()
 
 // sort output on Double field in descending and Int field in ascending order
-tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;
+tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()
 
 // sort output on the "author" field of nested BookPojo in descending order
-pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);
+pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)
 
 // sort output on the full tuple in ascending order
-tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);
+tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)
 
 // sort atomic type (String) output in descending order
-sData.sortPartition("_", Order.DESCENDING).writeAsText(...);
+sData.sortPartition("_", Order.DESCENDING).writeAsText(...)
 
 {% endhighlight %}
 
@@ -1486,7 +1486,7 @@ val result = count map { c => c / 10000.0 * 4 }
 
 result.print()
 
-env.execute("Iterative Pi Example");
+env.execute("Iterative Pi Example")
 {% endhighlight %}
 
 You can also check out the
@@ -1693,7 +1693,7 @@ val env = ExecutionEnvironment.createLocalEnvironment()
 val lines = env.readTextFile(pathToTextFile)
 // build your program
 
-env.execute();
+env.execute()
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index b6ee63c..8774fcb 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -159,7 +159,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
     return Requests.indexRequest()
             .index("my-index")
             .type("my-type")
-            .source(json);
+            .source(json)
   }
 }))
 {% endhighlight %}
@@ -185,7 +185,7 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
     return Requests.indexRequest()
             .index("my-index")
             .type("my-type")
-            .source(json);
+            .source(json)
   }
 }))
 {% endhighlight %}
@@ -298,7 +298,7 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
     return Requests.indexRequest()
             .index("my-index")
             .type("my-type")
-            .source(json);
+            .source(json)
   }
 }))
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6c80370..daf1903 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -130,14 +130,14 @@ DataStream<String> stream = env
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+val properties = new Properties()
+properties.setProperty("bootstrap.servers", "localhost:9092")
 // only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
+properties.setProperty("zookeeper.connect", "localhost:2181")
+properties.setProperty("group.id", "test")
 stream = env
     .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
-    .print
+    .print()
 {% endhighlight %}
 </div>
 </div>
@@ -422,17 +422,17 @@ DataStream<String> stream = env
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+val properties = new Properties()
+properties.setProperty("bootstrap.servers", "localhost:9092")
 // only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
+properties.setProperty("zookeeper.connect", "localhost:2181")
+properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
 stream = env
     .addSource(myConsumer)
-    .print
+    .print()
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 2c8b88a..ff22ee0 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -86,11 +86,11 @@ DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+val consumerConfig = new Properties()
+consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
 
 val env = StreamExecutionEnvironment.getEnvironment
 
@@ -295,28 +295,28 @@ simpleStringStream.addSink(kinesis);
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val producerConfig = new Properties();
+val producerConfig = new Properties()
 // Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
 // Optional KPL configs
-producerConfig.put("AggregationMaxCount", "4294967295");
-producerConfig.put("CollectionMaxCount", "1000");
-producerConfig.put("RecordTtl", "30000");
-producerConfig.put("RequestTimeout", "6000");
-producerConfig.put("ThreadPoolSize", "15");
+producerConfig.put("AggregationMaxCount", "4294967295")
+producerConfig.put("CollectionMaxCount", "1000")
+producerConfig.put("RecordTtl", "30000")
+producerConfig.put("RequestTimeout", "6000")
+producerConfig.put("ThreadPoolSize", "15")
 
 // Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST");
+// producerConfig.put("ThreadingModel", "PER_REQUEST")
 
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
+val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
+kinesis.setFailOnError(true)
+kinesis.setDefaultStream("kinesis_stream_name")
+kinesis.setDefaultPartition("0")
 
-val simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
+val simpleStringStream = ...
+simpleStringStream.addSink(kinesis)
 {% endhighlight %}
 </div>
 </div>
@@ -359,11 +359,11 @@ producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val producerConfig = new Properties();
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index a563be6..e6fe32a 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -67,12 +67,12 @@ DataStream<String> streamSource = env.addSource(new TwitterSource(props));
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val props = new Properties();
-props.setProperty(TwitterSource.CONSUMER_KEY, "");
-props.setProperty(TwitterSource.CONSUMER_SECRET, "");
-props.setProperty(TwitterSource.TOKEN, "");
-props.setProperty(TwitterSource.TOKEN_SECRET, "");
-DataStream<String> streamSource = env.addSource(new TwitterSource(props));
+val props = new Properties()
+props.setProperty(TwitterSource.CONSUMER_KEY, "")
+props.setProperty(TwitterSource.CONSUMER_SECRET, "")
+props.setProperty(TwitterSource.TOKEN, "")
+props.setProperty(TwitterSource.TOKEN_SECRET, "")
+val streamSource = env.addSource(new TwitterSource(props))
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 307679d..6bb755e 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -113,7 +113,7 @@ object WindowWordCount {
       .timeWindow(Time.seconds(5))
       .sum(1)
 
-    counts.print
+    counts.print()
 
     env.execute("Window Stream WordCount")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
index 802a079..acde9e4 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -154,7 +154,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
 val stream: DataStream[MyEvent] = env.readFile(
          myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
-         FilePathFilter.createDefaultFilter());
+         FilePathFilter.createDefaultFilter())
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
         .filter( _.severity == WARNING )
@@ -240,19 +240,19 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My
  */
 class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxOutOfOrderness = 3500L; // 3.5 seconds
+    val maxOutOfOrderness = 3500L // 3.5 seconds
 
-    var currentMaxTimestamp: Long;
+    var currentMaxTimestamp: Long
 
     override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long
= {
         val timestamp = element.getCreationTime()
         currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp;
+        timestamp
     }
 
     override def getCurrentWatermark(): Watermark = {
         // return the watermark as current highest timestamp minus the out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
     }
 }
 
@@ -262,7 +262,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
  */
 class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxTimeLag = 5000L; // 5 seconds
+    val maxTimeLag = 5000L // 5 seconds
 
     override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long
= {
         element.getCreationTime

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 506ddfe..d814a38 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -210,40 +210,40 @@ For a pattern named `start`, the following are valid quantifiers:
  start.times(4).optional()
 
  // expecting 2, 3 or 4 occurrences
- start.times(2, 4);
+ start.times(2, 4)
 
  // expecting 2, 3 or 4 occurrences and repeating as many as possible
- start.times(2, 4).greedy();
+ start.times(2, 4).greedy()
 
  // expecting 0, 2, 3 or 4 occurrences
- start.times(2, 4).optional();
+ start.times(2, 4).optional()
 
  // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
- start.times(2, 4).optional().greedy();
+ start.times(2, 4).optional().greedy()
 
  // expecting 1 or more occurrences
  start.oneOrMore()
 
  // expecting 1 or more occurrences and repeating as many as possible
- start.oneOrMore().greedy();
+ start.oneOrMore().greedy()
 
  // expecting 0 or more occurrences
  start.oneOrMore().optional()
 
  // expecting 0 or more occurrences and repeating as many as possible
- start.oneOrMore().optional().greedy();
+ start.oneOrMore().optional().greedy()
 
  // expecting 2 or more occurrences
- start.timesOrMore(2);
+ start.timesOrMore(2)
 
  // expecting 2 or more occurrences and repeating as many as possible
- start.timesOrMore(2).greedy();
+ start.timesOrMore(2).greedy()
 
  // expecting 0, 2 or more occurrences
- start.timesOrMore(2).optional();
+ start.timesOrMore(2).optional()
 
  // expecting 0, 2 or more occurrences and repeating as many as possible
- start.timesOrMore(2).optional().greedy();
+ start.timesOrMore(2).optional().greedy()
  {% endhighlight %}
  </div>
  </div>
@@ -729,7 +729,7 @@ pattern.times(2)
              <p>By default a relaxed internal contiguity (between subsequent events)
is used. For more info on
              internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
 {% highlight scala %}
-pattern.times(2, 4);
+pattern.times(2, 4)
 {% endhighlight %}
          </td>
        </tr>
@@ -765,7 +765,7 @@ pattern.oneOrMore().greedy()
 Pattern.begin("start").where(_.getName().equals("c"))
   .followedBy("middle").where(_.getName().equals("a"))
                        .oneOrMore().consecutive()
-  .followedBy("end1").where(_.getName().equals("b"));
+  .followedBy("end1").where(_.getName().equals("b"))
 {% endhighlight %}
 
             <p>Will generate the following matches for an input sequence: C D A1 A2
A3 D A4 B</p>
@@ -786,7 +786,7 @@ Pattern.begin("start").where(_.getName().equals("c"))
 Pattern.begin("start").where(_.getName().equals("c"))
   .followedBy("middle").where(_.getName().equals("a"))
                        .oneOrMore().allowCombinations()
-  .followedBy("end1").where(_.getName().equals("b"));
+  .followedBy("end1").where(_.getName().equals("b"))
 {% endhighlight %}
 
                       <p>Will generate the following matches for an input sequence:
C D A1 A2 A3 D A4 B</p>
@@ -1491,7 +1491,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp
     pattern: Map[String, Iterable[Event]] => ComplexEvent()
 }
 
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
+val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
 ~~~
 
 The `flatSelect` API call offers the same overloaded version which takes as the first parameter
a timeout function and as second parameter a selection function.
@@ -1510,7 +1510,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(
         out.collect(ComplexEvent())
 }
 
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
+val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
 ~~~
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/libs/gelly/graph_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_api.md b/docs/dev/libs/gelly/graph_api.md
index 465c24f..f00275e 100644
--- a/docs/dev/libs/gelly/graph_api.md
+++ b/docs/dev/libs/gelly/graph_api.md
@@ -773,7 +773,7 @@ final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[
 
 			for (neighbor <- neighbors) {
 				if (neighbor._1.getValue() > 0.5) {
-					out.collect(vertex, neighbor._2);
+					out.collect(vertex, neighbor._2)
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index 93a2c5d..015f85a 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -55,7 +55,7 @@ val graph: Graph[java.lang.Long, java.lang.Long, NullValue] = ...
 val verticesWithCommunity = graph.run(new LabelPropagation[java.lang.Long, java.lang.Long,
NullValue](30))
 
 // print the result
-verticesWithCommunity.print
+verticesWithCommunity.print()
 
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index 32945e4..c473638 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -150,7 +150,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)]
{
         // set the callback to be executed once the request by the client is complete
         // the callback simply forwards the result to the result future
         resultFuture.onSuccess {
-            case result: String => resultFuture.complete(Iterable((str, result)));
+            case result: String => resultFuture.complete(Iterable((str, result)))
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/stream/state/custom_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md
index ca6b07d..7f886d2 100644
--- a/docs/dev/stream/state/custom_serialization.md
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -61,7 +61,7 @@ val descriptor = new ListStateDescriptor[(String, Integer)](
     new CustomTypeSerializer)
 )
 
-checkpointedState = getRuntimeContext.getListState(descriptor);
+checkpointedState = getRuntimeContext.getListState(descriptor)
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/stream/testing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index e5bc024..ce31629 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -247,8 +247,8 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+env.enableCheckpointing(500)
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f5a2059..da6d2e4 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -131,7 +131,7 @@ val result: Table = orders
         .select('a.lowerCase(), 'b, 'rowtime)
         .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
         .groupBy('hourlyWindow, 'a)
-        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount);
+        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)
 {% endhighlight %}
 
 </div>
@@ -355,7 +355,7 @@ Table result = orders
       </td>
       <td>
        <p>Similar to a SQL OVER clause. Over window aggregates are computed for each
row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over
windows section</a> for more details.</p>
-       {% highlight scala %}
+{% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders
     // define window
@@ -364,8 +364,8 @@ Table result = orders
       .orderBy("rowtime")
       .preceding("UNBOUNDED_RANGE")
       .following("CURRENT_RANGE")
-      .as("w")
-    .select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate
+      .as("w"))
+    .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
 {% endhighlight %}
        <p><b>Note:</b> All aggregates must be defined over the same window,
i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED
and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet.
ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time
attribute</a>.</p>
       </td>
@@ -448,7 +448,7 @@ val result: Table = orders
       preceding UNBOUNDED_RANGE
       following CURRENT_RANGE
       as 'w)
-    .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w,) // sliding aggregate
+    .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate
 {% endhighlight %}
        <p><b>Note:</b> All aggregates must be defined over the same window,
i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED
and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet.
ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time
attribute</a>.</p>
       </td>
@@ -614,9 +614,9 @@ Table result = orders
       <td>
         <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct
field names and at least one equality join predicate must be defined through join operator
or using a where or filter operator.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'd, 'e, 'f);
-val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
+val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
 {% endhighlight %}
       </td>
     </tr>
@@ -656,12 +656,12 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a,
'b, 'e)
         <p><b>Note:</b> Currently, only <code>INNER</code>
time-windowed joins are supported.</p>
 
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime);
-val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
+val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)
 
 val result = left.join(right)
   .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime
+ 10.minutes)
-  .select('a, 'b, 'e, 'ltime);
+  .select('a, 'b, 'e, 'ltime)
 {% endhighlight %}
       </td>
     </tr>
@@ -856,9 +856,9 @@ Table result = left.select("a, b, c").where("a.in(RightTable)");
       <td>
         <p>Similar to a SQL UNION clause. Unions two tables with duplicate records
removed, both tables must have identical field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
-val result = left.union(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
+val result = left.union(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -872,9 +872,9 @@ val result = left.union(right);
       <td>
         <p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have
identical field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
-val result = left.unionAll(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
+val result = left.unionAll(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -887,9 +887,9 @@ val result = left.unionAll(right);
       <td>
         <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist
in both tables. If a record is present in one or both tables more than once, it is returned
just once, i.e., the resulting table has no duplicate records. Both tables must have identical
field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
-val result = left.intersect(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
+val result = left.intersect(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -902,9 +902,9 @@ val result = left.intersect(right);
       <td>
         <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that
exist in both tables. If a record is present in both tables more than once, it is returned
as many times as it is present in both tables, i.e., the resulting table might have duplicate
records. Both tables must have identical field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
-val result = left.intersectAll(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
+val result = left.intersectAll(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -917,9 +917,9 @@ val result = left.intersectAll(right);
       <td>
         <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table
that do not exist in the right table. Duplicate records in the left table are returned exactly
once, i.e., duplicates are removed. Both tables must have identical field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
-val result = left.minus(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
+val result = left.minus(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -932,9 +932,9 @@ val result = left.minus(right);
       <td>
         <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do
not exist in the right table. A record that is present n times in the left table and m times
in the right table is returned (n - m) times, i.e., as many duplicates as are present in the
right table are removed. Both tables must have identical field types.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
-val result = left.minusAll(right);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
+val result = left.minusAll(right)
 {% endhighlight %}
       </td>
     </tr>
@@ -947,9 +947,9 @@ val result = left.minusAll(right);
       <td>
         <p>Similar to a SQL IN clause. In returns true if an expression exists in a
given table sub-query. The sub-query table must consist of one column. This column must have
the same data type as the expression.</p>
 {% highlight scala %}
-val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
-val right = ds2.toTable(tableEnv, 'a);
-val result = left.select('a, 'b, 'c).where('a.in(right));
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
+val right = ds2.toTable(tableEnv, 'a)
+val result = left.select('a, 'b, 'c).where('a.in(right))
 {% endhighlight %}
       </td>
     </tr>
@@ -1030,8 +1030,8 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
       <td>
         <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across
all parallel partitions.</p>
 {% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c)
+val result = in.orderBy('a.asc)
 {% endhighlight %}
       </td>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff8b2098/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 71567d8..0e09302 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
 {% endhighlight %}
 </div>
 </div>
@@ -198,17 +198,17 @@ val myTable = ...         // table schema: [a: String]
 // Use the table function in the Scala Table API (Note: No registration required in Scala
Table API).
 val split = new Split("#")
 // "as" specifies the field names of the generated table.
-myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
-myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
+myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length)
+myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length)
 
 // Register the table function to use it in SQL queries.
 tableEnv.registerFunction("split", new Split("#"))
 
 // Use the table function in SQL with LATERAL and TABLE keywords.
 // CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word,
length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word,
length)")
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word,
length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word,
length) ON TRUE")
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton
and will cause concurrency issues.
 </div>
@@ -723,7 +723,7 @@ tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% highlight scala %}
 object hashCode extends ScalarFunction {
 
-  var hashcode_factor = 12;
+  var hashcode_factor = 12
 
   override def open(context: FunctionContext): Unit = {
     // access "hashcode_factor" parameter
@@ -743,7 +743,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
 {% endhighlight %}
 
 </div>


Mime
View raw message