flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
Date Tue, 24 Jan 2017 21:50:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15836693#comment-15836693
] 

ASF GitHub Bot commented on FLINK-3551:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2761#discussion_r97207859
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.scala.examples.twitter
    +
    +import java.util.StringTokenizer
    +
    +import org.apache.flink.api.java.utils.ParameterTool
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.connectors.twitter.TwitterSource
    +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
    +import org.codehaus.jackson.JsonNode
    +import org.codehaus.jackson.map.ObjectMapper
    +
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Implements the "TwitterStream" program that computes a most used word
    +  * occurrence over JSON objects in a streaming fashion.
    +  * <p>
    +  * The input is a Tweet stream from a TwitterSource.
    +  * </p>
    +  * <p>
    +  * Usage: <code>Usage: TwitterExample [--output <path>]
    +  * [--twitter-source.consumerKey <key>
    +  * --twitter-source.consumerSecret <secret>
    +  * --twitter-source.token <token>
    +  * --twitter-source.tokenSecret <tokenSecret>]
    +  * </code><br>
    +  *
    +  * If no parameters are provided, the program is run with default data from
    +  * {@link TwitterExampleData}.
    +  * </p>
    +  * <p>
    +  * This example shows how to:
    +  * <ul>
    +  * <li>acquire external data,
    +  * <li>use in-line defined functions,
    +  * <li>handle flattened stream inputs.
    +  * </ul>
    +  */
    +object TwitterExample {
    +
    +  def main(args: Array[String]): Unit = {
    +
    +    // Checking input parameters
    +    val params = ParameterTool.fromArgs(args)
    +    println("Usage: TwitterExample [--output <path>] " +
    +      "[--twitter-source.consumerKey <key> " +
    +      "--twitter-source.consumerSecret <secret> " +
    +      "--twitter-source.token <token> " +
    +      "--twitter-source.tokenSecret <tokenSecret>]")
    +
    +    // set up the execution environment
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +    // make parameters available in the web interface
    +    env.getConfig.setGlobalJobParameters(params)
    +
    +    env.setParallelism(params.getInt("parallelism", 1))
    +
    +    // get input data
    +    val streamSource =
    +    if (params.has(TwitterSource.CONSUMER_KEY) &&
    +      params.has(TwitterSource.CONSUMER_SECRET) &&
    +      params.has(TwitterSource.TOKEN) &&
    +      params.has(TwitterSource.TOKEN_SECRET)
    +    ) {
    +      env.addSource(new TwitterSource(params.getProperties))
    +    } else {
    +      print("Executing TwitterStream example with default props.")
    +      print("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret
<secret> " +
    +        "--twitter-source.token <token> " +
    +        "--twitter-source.tokenSecret <tokenSecret> specify the authentication
info."
    +      )
    +      // get default test text data
    +      env.fromElements(TwitterExampleData.TEXTS: _*)
    +    }
    +
    +    val tweets = streamSource
    +      // selecting English tweets and splitting to (word, 1)
    +      .flatMap { value: String =>
    +      val jsonParser = new ObjectMapper()
    --- End diff --
    
    Creating a new `ObjectMapper()` in each function call is quite expensive


> Sync Scala and Java Streaming Examples
> --------------------------------------
>
>                 Key: FLINK-3551
>                 URL: https://issues.apache.org/jira/browse/FLINK-3551
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Examples
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message