Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E938F18E21 for ; Thu, 10 Sep 2015 10:21:45 +0000 (UTC) Received: (qmail 76319 invoked by uid 500); 10 Sep 2015 10:21:45 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 76271 invoked by uid 500); 10 Sep 2015 10:21:45 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 76261 invoked by uid 99); 10 Sep 2015 10:21:45 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Sep 2015 10:21:45 +0000 Date: Thu, 10 Sep 2015 10:21:45 +0000 (UTC) From: "Robert Metzger (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2624) RabbitMQ source / sink should participate in checkpointing MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738549#comment-14738549 ] Robert Metzger commented on FLINK-2624: --------------------------------------- Why can't you just extend from {{RichSourceFunction}} ? (and maybe implement {{implements ResultTypeQueryable}} if needed?) In the {{run()}} method, you need to synchronize the state update (lastProcessedMessage = delivery.getEnvelope().getDeliveryTag()) and passing the element to the output collector, using the checkpoint lock from the source context. Otherwise, it can happen that the processing is not exactly once (we need to lock the while(running) loop while calling snapshotState()). > RabbitMQ source / sink should participate in checkpointing > ---------------------------------------------------------- > > Key: FLINK-2624 > URL: https://issues.apache.org/jira/browse/FLINK-2624 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 0.10 > Reporter: Stephan Ewen > Assignee: Hilmi Yildirim > > The RabbitMQ connector does not offer any fault tolerance guarantees right now, because it does not participate in the checkpointing. > We should integrate it in a similar was as the {{FlinkKafkaConsumer}} is integrated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)