Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 979E8200B32 for ; Thu, 9 Jun 2016 02:27:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 961A2160A35; Thu, 9 Jun 2016 00:27:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B6EAE160A2E for ; Thu, 9 Jun 2016 02:27:11 +0200 (CEST) Received: (qmail 55302 invoked by uid 500); 9 Jun 2016 00:27:10 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 55291 invoked by uid 99); 9 Jun 2016 00:27:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 00:27:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 3AE2DC07CC for ; Thu, 9 Jun 2016 00:27:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id OV4qtR1EE5Rc for ; Thu, 9 Jun 2016 00:27:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 426665FAD8 for ; Thu, 9 Jun 2016 00:27:07 +0000 (UTC) Received: (qmail 55281 invoked by uid 99); 9 Jun 2016 00:27:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 00:27:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3DD9BDFF68; Thu, 9 Jun 2016 00:27:06 +0000 (UTC) From: chinmaykolhatkar To: dev@apex.incubator.apache.org Reply-To: dev@apex.incubator.apache.org References: In-Reply-To: Subject: [GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out... Content-Type: text/plain Message-Id: <20160609002706.3DD9BDFF68@git1-us-west.apache.org> Date: Thu, 9 Jun 2016 00:27:06 +0000 (UTC) archived-at: Thu, 09 Jun 2016 00:27:12 -0000 Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66365356 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java --- @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; + +// 1. JavaDoc +// 2. Unit Test +// 3. Comparator -- not required +// 4. Generic Type +// 5. remove e.printStackTrace() +// 6. Should the Consumer be kept open? + +/** + * This is a base implementation of a Kafka output operator, + * which, in most cases, guarantees to send tuples to Kafka MQ only once.  + * Subclasses should implement the methods for converting tuples into a format appropriate for Kafka. + *

+ * Assuming messages kept in kafka are ordered by either key or value or keyvalue pair + * (For example, use timestamps as key), this Kafka OutputOperator always retrieve the last message from MQ as initial offset. + * So that replayed message wouldn't be sent to kafka again. + * + * This is not "perfect exact once" in 2 cases: + * 1 Multiple producers produce messages to same kafka partition + * 2 You have same message sent out and before kafka synchronized this message among all the brokers, the operator is + * started again. + * + *
+ * Ports:
+ * Input: One input port
+ * Output: No output port
+ *
+ * Properties:
+ * configProperties
+ *
+ * Compile time checks:
+ * Class derived from has to implement 2 methods:
+ * tupleToKeyValue() to convert input tuples to kafka key value objects
+ * compareToLastMsg() to compare incoming tuple with the last received msg in kafka so that the operator could skip the received ones
+ *
+ * Run time checks:
+ * None
+ *
+ * Benchmarks:
+ * TBD
+ *

+ * + * @displayName Abstract Exactly Once Kafka Output(0.9.0) + * @category Messaging + * @tags output operator + * + * @since 3.5 + */ +public abstract class AbstractExactlyOnceKafkaOutputOperator extends AbstractKafkaOutputOperator + implements Operator.CheckpointNotificationListener +{ + private WindowDataManager windowDataManager = new FSWindowDataManager(); + private String key; + private Integer operatorId; + private String appId; + private transient Long windowId; + private transient Set recoveredTuples = new HashSet<>(); + private int KAFKA_CONNECT_ATTEMPT = 10; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + this.operatorId = context.getId(); + this.windowDataManager.setup(context); + this.appId = context.getValue(Context.DAGContext.APPLICATION_ID); + this.key = appId + '#' + (new Integer(operatorId)); + } + + @Override + public void beginWindow(long windowId) + { + this.windowId = windowId; + + if (windowId == windowDataManager.getLargestRecoveryWindow()) { + rebuildLastWindow(); + } + } + + // Only the tuples in the incomplete window needs to be read from the Kafka. + // Re-sent tuples from the previous windows are not written to Kafka. + private void rebuildLastWindow() + { + recoveredTuples.clear(); + + Map storedOffsets = getStoredOffsets(); + Map currentOffsets = getCurrentOffsets(); + + if (storedOffsets == null || currentOffsets == null) { + //TODO: Take some action + return; + } + + KafkaConsumer consumer = KafkaConsumerInit(); + + List topicPartitions = new ArrayList<>(); + + for (Map.Entry entry: currentOffsets.entrySet()) { + + topicPartitions.add(new TopicPartition(getTopic(), entry.getKey())); + } + + consumer.assign(topicPartitions); + + // From each partitions in a topic + // 1. Read the tuples between stored offset and the latest offset + // 2. Check the Key to filter out the messages + // 3. Store the recovered tuples in the HashSet + + for (Map.Entry entry: currentOffsets.entrySet()) { + + Long storedOffset = 0L; + Integer currentPartition = entry.getKey(); + Long currentOffset = entry.getValue(); + + if (storedOffsets.containsKey(currentPartition)) { + storedOffset = storedOffsets.get(currentPartition); + } + + if (storedOffset >= currentOffset) { + continue; + } + + consumer.seek(new TopicPartition(getTopic(), currentPartition), storedOffset); + + int kafkaAttempt = 0; + + while ( true ) { + + ConsumerRecords consumerRecords = consumer.poll(100); + + if (consumerRecords.count() == 0) { + ++kafkaAttempt; + + if (kafkaAttempt == KAFKA_CONNECT_ATTEMPT) { + break; + } + } else { + kafkaAttempt = 0; + } + + boolean crossedBoundary = false; + + for (ConsumerRecord consumerRecord : consumerRecords) { + + if ( !doesKeyBelongsThisInstance(operatorId, (String)consumerRecord.key()) ) { + continue; + } + + recoveredTuples.add((T)consumerRecord.value()); + + if ( consumerRecord.offset() >= currentOffset ) { + crossedBoundary = true; + break; + } + } + + if ( crossedBoundary ) { + break; + } + } + } + + consumer.close(); + } + + private Map getCurrentOffsets() + { + Map currentOffsets = null; + + try { + currentOffsets = getPartitionsAndOffsets(); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return currentOffsets; + } + + private Map getStoredOffsets() + { + Map storedOffsets = null; + try { + storedOffsets = (Map)this.windowDataManager.load(operatorId, windowId); + } catch (IOException e) { + e.printStackTrace(); + } + + return storedOffsets; + } + + private KafkaConsumer KafkaConsumerInit() + { + Properties props = new Properties(); + props.put("bootstrap.servers", getConfigProperties().get("bootstrap.servers")); --- End diff -- Can you use Java Constants instead of hardcoding property strings? You can find the public static string constants in org.apache.kafka.clients.producer.ProducerConfig --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---