flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Date Mon, 13 Apr 2015 10:08:15 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28227177
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
---
    @@ -65,36 +75,145 @@ public void createTopic(String topicName, int numOfPartitions, int
replicationFa
     				LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
     			}
     		} else {
    +			LOG.info("Connecting zookeeper");
    +
    +			initZkClient();
     			AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
    +			closeZkClient();
    +		}
    +	}
    +
    +	public String getBrokerList(String topicName) {
    +		return getBrokerAddressList(getBrokerAddresses(topicName));
    +	}
    +
    +	public String getBrokerList(String topicName, int partitionId) {
    +		return getBrokerAddressList(getBrokerAddresses(topicName, partitionId));
    +	}
    +
    +	public Set<String> getBrokerAddresses(String topicName) {
    +		int numOfPartitions = getNumberOfPartitions(topicName);
    +
    +		HashSet<String> brokers = new HashSet<String>();
    +		for (int i = 0; i < numOfPartitions; i++) {
    +			brokers.addAll(getBrokerAddresses(topicName, i));
     		}
    +		return brokers;
    +	}
    +
    +	public Set<String> getBrokerAddresses(String topicName, int partitionId) {
    +		PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId);
    +		Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr());
    +
    +		HashSet<String> addresses = new HashSet<String>();
    +		for (Broker broker : inSyncReplicas) {
    +			addresses.add(broker.connectionString());
    +		}
    +		return addresses;
    +	}
    +
    +	private static String getBrokerAddressList(Set<String> brokerAddresses) {
    +		StringBuilder brokerAddressList = new StringBuilder("");
    +		for (String broker : brokerAddresses) {
    +			brokerAddressList.append(broker);
    +			brokerAddressList.append(',');
    +		}
    +		brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
    +
    +		return brokerAddressList.toString();
     	}
     
     	public int getNumberOfPartitions(String topicName) {
    -		Seq<PartitionMetadata> partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata();
    +		Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata();
     		return JavaConversions.asJavaCollection(partitionMetadataSeq).size();
     	}
     
    -	public String getLeaderBrokerAddressForTopic(String topicName) {
    -		TopicMetadata topicInfo = getTopicInfo(topicName);
    +	public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId)
{
    +		PartitionMetadata partitionMetadata;
    +		while (true) {
    +			try {
    +				partitionMetadata = getPartitionMetadata(topicName, partitionId);
    +				return partitionMetadata;
    +			} catch (LeaderNotAvailableException e) {
    +				// try fetching metadata again
    --- End diff --
    
    I would suggest to LOG.debug the exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message