apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chinmaykolhatkar <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...
Date Thu, 09 Jun 2016 00:27:06 GMT
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/298#discussion_r66365356
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java
---
    @@ -0,0 +1,385 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.apex.malhar.kafka;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
    +import org.apache.apex.malhar.lib.wal.WindowDataManager;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.Operator;
    +
    +//       1. JavaDoc
    +//       2. Unit Test
    +//       3. Comparator -- not required
    +//       4. Generic Type
    +//       5. remove e.printStackTrace()
    +//       6. Should the Consumer be kept open?
    +
    +/**
    + * This is a base implementation of a Kafka output operator,
    + * which, in most cases, guarantees to send tuples to Kafka MQ only once.&nbsp;
    + * Subclasses should implement the methods for converting tuples into a format appropriate
for Kafka.
    + * <p>
    + * Assuming messages kept in kafka are ordered by either key or value or keyvalue pair
    + * (For example, use timestamps as key), this Kafka OutputOperator always retrieve the
last message from MQ as initial offset.
    + *  So that replayed message wouldn't be sent to kafka again.
    + *
    + * This is not "perfect exact once" in 2 cases:
    + * 1 Multiple producers produce messages to same kafka partition
    + * 2 You have same message sent out and before kafka synchronized this message among
all the brokers, the operator is
    + * started again.
    + *
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: One input port<br>
    + * <b>Output</b>: No output port<br>
    + * <br>
    + * Properties:<br>
    + * configProperties<br>
    + * <br>
    + * Compile time checks:<br>
    + * Class derived from has to implement 2 methods:<br>
    + * tupleToKeyValue() to convert input tuples to kafka key value objects<br>
    + * compareToLastMsg() to compare incoming tuple with the last received msg in kafka so
that the operator could skip the received ones<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract Exactly Once Kafka Output(0.9.0)
    + * @category Messaging
    + * @tags output operator
    + *
    + * @since 3.5
    + */
    +public abstract class AbstractExactlyOnceKafkaOutputOperator<T> extends AbstractKafkaOutputOperator<String,
T>
    +    implements Operator.CheckpointNotificationListener
    +{
    +  private WindowDataManager windowDataManager = new FSWindowDataManager();
    +  private String key;
    +  private Integer operatorId;
    +  private String appId;
    +  private transient Long windowId;
    +  private transient Set<T> recoveredTuples = new HashSet<>();
    +  private int KAFKA_CONNECT_ATTEMPT = 10;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    this.operatorId = context.getId();
    +    this.windowDataManager.setup(context);
    +    this.appId = context.getValue(Context.DAGContext.APPLICATION_ID);
    +    this.key = appId + '#' + (new Integer(operatorId));
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    this.windowId = windowId;
    +
    +    if (windowId == windowDataManager.getLargestRecoveryWindow()) {
    +      rebuildLastWindow();
    +    }
    +  }
    +
    +  // Only the tuples in the incomplete window needs to be read from the Kafka.
    +  // Re-sent tuples from the previous windows are not written to Kafka.
    +  private void rebuildLastWindow()
    +  {
    +    recoveredTuples.clear();
    +
    +    Map<Integer,Long> storedOffsets = getStoredOffsets();
    +    Map<Integer,Long> currentOffsets = getCurrentOffsets();
    +
    +    if (storedOffsets == null || currentOffsets == null) {
    +      //TODO: Take some action
    +      return;
    +    }
    +
    +    KafkaConsumer consumer = KafkaConsumerInit();
    +
    +    List<TopicPartition> topicPartitions = new ArrayList<>();
    +
    +    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
    +
    +      topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
    +    }
    +
    +    consumer.assign(topicPartitions);
    +
    +    // From each partitions in a topic
    +    // 1. Read the tuples between stored offset and the latest offset
    +    // 2. Check the Key to filter out the messages
    +    // 3. Store the recovered tuples in the HashSet
    +
    +    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
    +
    +      Long storedOffset = 0L;
    +      Integer currentPartition = entry.getKey();
    +      Long currentOffset = entry.getValue();
    +
    +      if (storedOffsets.containsKey(currentPartition)) {
    +        storedOffset = storedOffsets.get(currentPartition);
    +      }
    +
    +      if (storedOffset >= currentOffset) {
    +        continue;
    +      }
    +
    +      consumer.seek(new TopicPartition(getTopic(), currentPartition), storedOffset);
    +
    +      int kafkaAttempt = 0;
    +
    +      while ( true ) {
    +
    +        ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
    +
    +        if (consumerRecords.count() == 0) {
    +          ++kafkaAttempt;
    +
    +          if (kafkaAttempt == KAFKA_CONNECT_ATTEMPT) {
    +            break;
    +          }
    +        } else {
    +          kafkaAttempt = 0;
    +        }
    +
    +        boolean crossedBoundary = false;
    +
    +        for (ConsumerRecord consumerRecord : consumerRecords) {
    +
    +          if ( !doesKeyBelongsThisInstance(operatorId, (String)consumerRecord.key())
) {
    +            continue;
    +          }
    +
    +          recoveredTuples.add((T)consumerRecord.value());
    +
    +          if ( consumerRecord.offset() >= currentOffset ) {
    +            crossedBoundary = true;
    +            break;
    +          }
    +        }
    +
    +        if ( crossedBoundary ) {
    +          break;
    +        }
    +      }
    +    }
    +
    +    consumer.close();
    +  }
    +
    +  private Map<Integer,Long> getCurrentOffsets()
    +  {
    +    Map<Integer, Long> currentOffsets = null;
    +
    +    try {
    +      currentOffsets = getPartitionsAndOffsets();
    +    } catch (ExecutionException e) {
    +      e.printStackTrace();
    +    } catch (InterruptedException e) {
    +      e.printStackTrace();
    +    }
    +
    +    return currentOffsets;
    +  }
    +
    +  private Map<Integer,Long> getStoredOffsets()
    +  {
    +    Map<Integer,Long> storedOffsets = null;
    +    try {
    +      storedOffsets = (Map<Integer,Long>)this.windowDataManager.load(operatorId,
windowId);
    +    } catch (IOException e) {
    +      e.printStackTrace();
    +    }
    +
    +    return storedOffsets;
    +  }
    +
    +  private KafkaConsumer KafkaConsumerInit()
    +  {
    +    Properties props = new Properties();
    +    props.put("bootstrap.servers", getConfigProperties().get("bootstrap.servers"));
    --- End diff --
    
    Can you use Java Constants instead of hardcoding property strings?
    You can find the public static string constants in org.apache.kafka.clients.producer.ProducerConfig


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message