From issues-return-193880-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Oct 11 17:13:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E984418067A for ; Thu, 11 Oct 2018 17:13:06 +0200 (CEST) Received: (qmail 56556 invoked by uid 500); 11 Oct 2018 15:13:06 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 56546 invoked by uid 99); 11 Oct 2018 15:13:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Oct 2018 15:13:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 77D06C0CD0 for ; Thu, 11 Oct 2018 15:13:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 5wrN_UIkJX_h for ; Thu, 11 Oct 2018 15:13:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id B02735FBB7 for ; Thu, 11 Oct 2018 15:13:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id E5C1EE2643 for ; Thu, 11 Oct 2018 15:13:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id B4B6024DDD for ; Thu, 11 Oct 2018 15:13:01 +0000 (UTC) Date: Thu, 11 Oct 2018 15:13:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646581#comment-16646581 ] ASF GitHub Bot commented on FLINK-9083: --------------------------------------- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224475996 ########## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ########## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // ------------------------ Default Configurations ------------------------ + + /** + * The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. + */ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; Review comment: maybe minor thing, I would rather say that these default constants belong to the builder where they are actually used and can be documentation for themselves. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Add async backpressure support to Cassandra Connector > ----------------------------------------------------- > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector > Reporter: Jacob Park > Assignee: Jacob Park > Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking at a maximum concurrent requests limit like how DataStax's Spark Cassandra Connector functions: [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra Sink Connector implementation on Apache Flink in production. I would like to contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)