From issues-return-193877-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Oct 11 17:12:30 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B89EE18076D for ; Thu, 11 Oct 2018 17:12:29 +0200 (CEST) Received: (qmail 53544 invoked by uid 500); 11 Oct 2018 15:12:23 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 53093 invoked by uid 99); 11 Oct 2018 15:12:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Oct 2018 15:12:23 +0000 From: GitBox To: issues@flink.apache.org Subject: [GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector Message-ID: <153927074258.32757.869617131597557114.gitbox@gitbox.apache.org> Date: Thu, 11 Oct 2018 15:12:22 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224481690 ########## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ########## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // ------------------------ Default Configurations ------------------------ + + /** + * The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. + */ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** + * The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. + */ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** + * The default timeout unit when acquiring a permit to execute. By default, milliseconds. + */ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // ------------------------- Configuration Fields ------------------------- + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --------------------------- Cassandra Fields --------------------------- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // ------------------------ Synchronization Fields ------------------------ + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // ----------------------------- Sink Methods ----------------------------- + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* + * A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. + * + * This Phaser is configured to support "1 + N" parties. + * - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. + * - "N" for the varying number of invoke() calls that register and de-register with the Phaser. + * + * The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. + * This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits + * are being released during a flush() call. + */ + phaser = new Phaser(1) { Review comment: Phaser solution looks good. I am wondering whether we need such low level approach. `send` and `snapshotState` are called synchronously. We could have a concurrent set of futures and use `FutureUtil.waitForAll` to sync in flush. This looks simpler to me. Is there a particular performance reason to use the `Phaser`? I would also abstract semaphore/Phaser/futures/addFuture/waitForAll/error away into another reusable and pluggable component. It can be also tested separately. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services