Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 352BD18AF6 for ; Thu, 19 Nov 2015 12:43:59 +0000 (UTC) Received: (qmail 20456 invoked by uid 500); 19 Nov 2015 12:43:57 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 20393 invoked by uid 500); 19 Nov 2015 12:43:57 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 20326 invoked by uid 99); 19 Nov 2015 12:43:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 12:43:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5930BE17D0; Thu, 19 Nov 2015 12:43:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 19 Nov 2015 12:43:59 -0000 Message-Id: <22761fd5adf74da9bb6ee2a9e76ffd64@git.apache.org> In-Reply-To: <50391953bb66466f8f151318f2ffb7ca@git.apache.org> References: <50391953bb66466f8f151318f2ffb7ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: FLINK-3041: Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example 'TwitterLocal' FLINK-3041: Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example 'TwitterLocal' This closes #1379. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be27a188 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be27a188 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be27a188 Branch: refs/heads/release-0.10 Commit: be27a188fb2e4979bffe7b4b624e7f9243cebba5 Parents: df3347b Author: smarthi Authored: Wed Nov 18 12:48:38 2015 -0500 Committer: Till Rohrmann Committed: Thu Nov 19 13:43:27 2015 +0100 ---------------------------------------------------------------------- docs/apis/streaming_guide.md | 6 +++--- .../connectors/twitter/TwitterFilterSource.java | 17 ++++++++--------- .../connectors/twitter/TwitterSource.java | 4 ++-- .../connectors/twitter/TwitterStreaming.java | 4 ++-- .../connectors/twitter/TwitterTopology.java | 4 ++-- .../streaming/connectors/json/JSONParserTest.java | 1 - .../streaming/connectors/json/JSONParserTest2.java | 1 - .../examples/iteration/IterateExample.java | 14 ++++++-------- .../streaming/examples/twitter/TwitterStream.java | 4 ++-- .../GroupedProcessingTimeWindowExample.java | 2 +- .../examples/windowing/TopSpeedWindowing.java | 8 ++++---- 11 files changed, 30 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 3bb2ee8..a5b8b5b 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -3929,7 +3929,7 @@ In order to connect to Twitter stream the user has to register their program and #### Acquiring the authentication information First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions. -After selecting the application, the API key and API secret (called `consumerKey` and `sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary access token data (`token` and `secret`) can be acquired here. +After selecting the application, the API key and API secret (called `consumerKey` and `consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (`token` and `secret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab. Remember to keep these pieces of information secret and do not push them to public repositories. #### Accessing the authentication information @@ -3947,7 +3947,7 @@ consumerKey=*** The `TwitterSource` class has two constructors. 1. `public TwitterSource(String authPath, int numberOfTweets);` -to emit finite number of tweets +to emit a finite number of tweets 2. `public TwitterSource(String authPath);` for streaming @@ -3990,7 +3990,7 @@ function which can be use to acquire the value of a given field. There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information. #### Example -`TwitterLocal` is an example how to use `TwitterSource`. It implements a language frequency counter program. +`TwitterStream` is an example of how to use `TwitterSource`. It implements a language frequency counter program. [Back to top](#top) http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java index 8dd4458..2894322 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java @@ -39,22 +39,21 @@ import com.twitter.hbc.httpclient.auth.Authentication; */ public class TwitterFilterSource extends TwitterSource { - private static final Logger LOG = LoggerFactory - .getLogger(TwitterFilterSource.class); + private static final Logger LOG = LoggerFactory.getLogger(TwitterFilterSource.class); private static final long serialVersionUID = 1L; - private List trackTerms = new LinkedList(); + private List trackTerms = new LinkedList<>(); - private List languages = new LinkedList(); + private List languages = new LinkedList<>(); - private List followings = new LinkedList(); + private List followings = new LinkedList<>(); - private List locations = new LinkedList(); + private List locations = new LinkedList<>(); - private Map queryParameters = new HashMap(); + private Map queryParameters = new HashMap<>(); - private Map postParameters = new HashMap(); + private Map postParameters = new HashMap<>(); public TwitterFilterSource(String authPath) { super(authPath); @@ -66,7 +65,7 @@ public class TwitterFilterSource extends TwitterSource { if (LOG.isInfoEnabled()) { LOG.info("Initializing Twitter Streaming API connection"); } - queue = new LinkedBlockingQueue(queueSize); + queue = new LinkedBlockingQueue<>(queueSize); StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); configEndpoint(endpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java index bad0f8c..d99af82 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -77,7 +77,7 @@ public class TwitterSource extends RichSourceFunction { * @param authPath * Location of the properties file containing the required * authentication information. - * @param numberOfTweets + * @param numberOfTweets max number of tweets * */ public TwitterSource(String authPath, int numberOfTweets) { @@ -101,7 +101,7 @@ public class TwitterSource extends RichSourceFunction { LOG.info("Initializing Twitter Streaming API connection"); } - queue = new LinkedBlockingQueue(queueSize); + queue = new LinkedBlockingQueue<>(queueSize); StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); endpoint.stallWarnings(false); http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java index a80c32a..3e8ce1b 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java @@ -58,7 +58,7 @@ public class TwitterStreaming { public void flatMap(String value, Collector> out) throws Exception { try { - out.collect(new Tuple5( + out.collect(new Tuple5<>( getLong(value, "id"), getInt(value, "entities.hashtags[0].indices[1]"), getString(value, "lang"), @@ -74,7 +74,7 @@ public class TwitterStreaming { public static void main(String[] args) throws Exception { - String path = new String(); + String path; if (args != null && args.length == 1) { path = args[0]; http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java index b1fc92c..d88184c 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java @@ -58,7 +58,7 @@ public class TwitterTopology { public static void main(String[] args) throws Exception { - String path = new String(); + String path; if (args != null && args.length == 1) { path = args[0]; @@ -79,7 +79,7 @@ public class TwitterTopology { @Override public Tuple2 map(String value) throws Exception { - return new Tuple2(value, 1); + return new Tuple2<>(value, 1); } }) .keyBy(0) http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java index b1d4115..33108c9 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.Collection; -import org.apache.flink.streaming.connectors.json.JSONParser; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java index 8851086..eb796b4 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.flink.streaming.connectors.json.JSONParser; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 52ec896..b6e1a61 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -121,7 +121,7 @@ public class IterateExample { int first = rnd.nextInt(BOUND / 2 - 1) + 1; int second = rnd.nextInt(BOUND / 2 - 1) + 1; - ctx.collect(new Tuple2(first, second)); + ctx.collect(new Tuple2<>(first, second)); counter++; Thread.sleep(50L); } @@ -143,7 +143,7 @@ public class IterateExample { public Tuple2 map(String value) throws Exception { String record = value.substring(1, value.length() - 1); String[] splitted = record.split(","); - return new Tuple2(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); + return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); } } @@ -158,7 +158,7 @@ public class IterateExample { @Override public Tuple5 map(Tuple2 value) throws Exception { - return new Tuple5(value.f0, value.f1, value.f0, value.f1, 0); + return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); } } @@ -173,8 +173,7 @@ public class IterateExample { @Override public Tuple5 map(Tuple5 value) throws Exception { - return new Tuple5(value.f0, value.f1, value.f3, value.f2 + - value.f3, ++value.f4); + return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); } } @@ -186,7 +185,7 @@ public class IterateExample { @Override public Iterable select(Tuple5 value) { - List output = new ArrayList(); + List output = new ArrayList<>(); if (value.f2 < BOUND && value.f3 < BOUND) { output.add("iterate"); } else { @@ -207,8 +206,7 @@ public class IterateExample { public Tuple2, Integer> map(Tuple5 value) throws Exception { - return new Tuple2, Integer>(new Tuple2(value.f0, value.f1), - value.f4); + return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4); } } http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java index 06872f0..d26dc42 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java @@ -110,8 +110,8 @@ public class TwitterStream { while (tokenizer.hasMoreTokens()) { String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase(); - if (result != null && !result.equals("")) { - out.collect(new Tuple2(result, 1)); + if (!result.equals("")) { + out.collect(new Tuple2<>(result, 1)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 982b73d..f08069b 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -59,7 +59,7 @@ public class GroupedProcessingTimeWindowExample { while (running && count < numElements) { count++; - ctx.collect(new Tuple2(val++, 1L)); + ctx.collect(new Tuple2<>(val++, 1L)); if (val > numKeys) { val = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index df3402e..30eda67 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -64,9 +64,12 @@ public class TopSpeedWindowing { if (fileInput) { carData = env.readTextFile(inputPath).map(new ParseCarData()); } else { + int numOfCars = 2; carData = env.addSource(CarSource.create(numOfCars)); } + int evictionSec = 10; + double triggerMeters = 50; DataStream> topSpeeds = carData .assignTimestamps(new CarTimestamp()) .keyBy(0) @@ -133,7 +136,7 @@ public class TopSpeedWindowing { speeds[carId] = Math.max(0, speeds[carId] - 5); } distances[carId] += speeds[carId] / 3.6d; - Tuple4 record = new Tuple4(carId, + Tuple4 record = new Tuple4<>(carId, speeds[carId], distances[carId], System.currentTimeMillis()); ctx.collect(record); counter++; @@ -186,9 +189,6 @@ public class TopSpeedWindowing { private static boolean fileInput = false; private static boolean fileOutput = false; - private static int numOfCars = 2; - private static int evictionSec = 10; - private static double triggerMeters = 50; private static String inputPath; private static String outputPath;