flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request: [streaming] [wip] Fault tolerance prototype (f...
Date Sun, 08 Mar 2015 22:32:09 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/459#discussion_r26010038
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.api.simple;
    +
    +import java.io.Serializable;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import kafka.api.FetchRequest;
    +import kafka.api.FetchRequestBuilder;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.FetchResponse;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.PartitionMetadata;
    +import kafka.javaapi.TopicMetadata;
    +import kafka.javaapi.TopicMetadataRequest;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import kafka.message.MessageAndOffset;
    +
    +/**
    + * Iterates the records received from a partition of a Kafka topic as byte arrays.
    + */
    +public class KafkaConsumerIterator implements Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L;
    +
    +	private List<String> hosts;
    +	private String topic;
    +	private int port;
    +	private int partition;
    +	private long readOffset;
    +	private long waitOnEmptyFetch;
    +	private transient SimpleConsumer consumer;
    +	private List<String> replicaBrokers;
    +	private String clientName;
    +
    +	private transient Iterator<MessageAndOffset> iter;
    +	private transient FetchResponse fetchResponse;
    +
    +	/**
    +	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka
service
    +	 * we use the so called simple or low level Kafka API thus directly connecting to one
of the brokers.
    +	 *
    +	 * @param hostName Hostname of a known Kafka broker
    +	 * @param port Port of the known Kafka broker
    +	 * @param topic Name of the topic to listen to
    +	 * @param partition Partition in the chosen topic
    +	 * @param waitOnEmptyFetch wait time on empty fetch in millis
    +	 */
    +	public KafkaConsumerIterator(String hostName, int port, String topic, int partition,
    +			long waitOnEmptyFetch) {
    +
    +		this.hosts = new ArrayList<String>();
    +		hosts.add(hostName);
    +		this.port = port;
    +
    +		this.topic = topic;
    +		this.partition = partition;
    +		this.waitOnEmptyFetch = waitOnEmptyFetch;
    +
    +		replicaBrokers = new ArrayList<String>();
    +	}
    +
    +	/**
    +	 * Constructor without configurable wait time on empty fetch. For connecting to the
Kafka service
    +	 * we use the so called simple or low level Kafka API thus directly connecting to one
of the brokers.
    +	 *
    +	 * @param hostName Hostname of a known Kafka broker
    +	 * @param port Port of the known Kafka broker
    +	 * @param topic Name of the topic to listen to
    +	 * @param partition Partition in the chosen topic
    +	 */
    +	public KafkaConsumerIterator(String hostName, int port, String topic, int partition){
    +		this(hostName, port, topic, partition, DEFAULT_WAIT_ON_EMPTY_FETCH);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Initializing a connection
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Initializes the connection by detecting the leading broker of
    +	 * the topic and establishing a connection to it.
    +	 */
    +	private void initialize() throws InterruptedException {
    +		PartitionMetadata metadata;
    +		do {
    +			metadata = findLeader(hosts, port, topic, partition);
    +			try {
    +				Thread.sleep(waitOnEmptyFetch);
    --- End diff --
    
    Why sleep even if the `findLeader(...)` call was successful?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message