flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
Date Fri, 22 Sep 2017 15:05:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176554#comment-16176554
] 

ASF GitHub Bot commented on FLINK-7632:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140517182
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact
on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:
The write-ahead log functionality is currently experimental. In many cases it is sufficent
to use the connector without enabling it. Please report problems to the development mailing
list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action
requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However,
current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint
was triggered. Thus, some in-flight mutations might not be replayed when the job recovered.
</p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html)
and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled
at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink
automatically detects which type of input is used. For general use case of those streaming
data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html).
We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount`
have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required
to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database.
With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts
each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java
Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would
be thrown with the following error message `Query must not be null or empty.`
    --- End diff --
    
    I would leave this out. Its one of those things that are easily out-dated, and don't provide
immediate value when reading the docs for the first time. If a user stumbles upon this the
error message should be self-explanatory; if it isn't we should change it accordingly.


> Better documentation and examples on C* sink usage for Pojo and Tuples data types
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7632
>                 URL: https://issues.apache.org/jira/browse/FLINK-7632
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cassandra Connector, Documentation
>            Reporter: Michael Fong
>            Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve documentation
on its usage as well as some concrete / meaningful examples for both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message