kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "DHRUV BANSAL (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6339) Integration test with embedded kafka not working
Date Sat, 09 Dec 2017 18:37:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

DHRUV BANSAL updated KAFKA-6339:
--------------------------------
    Description: 
I am using Kafka version - 0.11.0.2

Trying to write an integration test for one of the components I am writing over Kafka.
Following code works fine with Kafka version 0.10.0.0 but not working with mentioned version
(0.11.0.2)
{{
// setup Zookeeper
		{{EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
		String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
		ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
		ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
}}
		// setup Broker
		Properties brokerProps = new Properties();
		brokerProps.setProperty("zookeeper.connect", zkConnect);
		brokerProps.setProperty("broker.id", "0");
		brokerProps.setProperty("offsets.topic.replication.factor", "1");
		String kafka_log_path = Files.createTempDirectory("kafka-").toAbsolutePath().toString();
		System.out.println("kafka log path " + kafka_log_path);
		brokerProps.setProperty("log.dirs", kafka_log_path);
		brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
		KafkaConfig config = new KafkaConfig(brokerProps);
		Time mock = new MockTime();
		KafkaServer kafkaServer = TestUtils.createServer(config, mock);

		// create topic
		AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

		// setup producer
		Properties producerProps = new Properties();
		producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
		producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
		producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
		KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);

		// setup consumer
		Properties consumerProps = new Properties();
		consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
		consumerProps.setProperty("group.id", "group0");
		consumerProps.setProperty("client.id", "consumer0");
		consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
		consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
		consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts
from the beginning of
															// the topic

		KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
		consumer.subscribe(Arrays.asList(TOPIC));

		// send message
		ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42,
				"test-message".getBytes(StandardCharsets.UTF_8));
		Future<RecordMetadata> record = producer.send(data);
		RecordMetadata metadata = record.get();

		// starting consumer
		ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
		assertEquals(1, records.count());
		Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
		ConsumerRecord<Integer, byte[]> consumedRecord = recordIterator.next();
		System.out.printf("offset = %d, key = %s, value = %s", consumedRecord.offset(), consumedRecord.key(),
				consumedRecord.value());
		assertEquals(42, (int) consumedRecord.key());
		assertEquals("test-message", new String(consumedRecord.value(), StandardCharsets.UTF_8));

		kafkaServer.shutdown();
		zkClient.close();
		embeddedZookeeper.shutdown();
}}

Please provide support for the same and there should be proper documentation for the intergration
test with each release. 

  was:
I am using Kafka version - 0.11.0.2

Trying to write an integration test for one of the components I am writing over Kafka.
Following code works fine with Kafka version 0.10.0.0 but not working with mentioned version
(0.11.0.2)
{{
// setup Zookeeper
		EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
		String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
		ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
		ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

		// setup Broker
		Properties brokerProps = new Properties();
		brokerProps.setProperty("zookeeper.connect", zkConnect);
		brokerProps.setProperty("broker.id", "0");
		brokerProps.setProperty("offsets.topic.replication.factor", "1");
		String kafka_log_path = Files.createTempDirectory("kafka-").toAbsolutePath().toString();
		System.out.println("kafka log path " + kafka_log_path);
		brokerProps.setProperty("log.dirs", kafka_log_path);
		brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
		KafkaConfig config = new KafkaConfig(brokerProps);
		Time mock = new MockTime();
		KafkaServer kafkaServer = TestUtils.createServer(config, mock);

		// create topic
		AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

		// setup producer
		Properties producerProps = new Properties();
		producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
		producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
		producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
		KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);

		// setup consumer
		Properties consumerProps = new Properties();
		consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
		consumerProps.setProperty("group.id", "group0");
		consumerProps.setProperty("client.id", "consumer0");
		consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
		consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
		consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts
from the beginning of
															// the topic

		KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
		consumer.subscribe(Arrays.asList(TOPIC));

		// send message
		ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42,
				"test-message".getBytes(StandardCharsets.UTF_8));
		Future<RecordMetadata> record = producer.send(data);
		RecordMetadata metadata = record.get();

		// starting consumer
		ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
		assertEquals(1, records.count());
		Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
		ConsumerRecord<Integer, byte[]> consumedRecord = recordIterator.next();
		System.out.printf("offset = %d, key = %s, value = %s", consumedRecord.offset(), consumedRecord.key(),
				consumedRecord.value());
		assertEquals(42, (int) consumedRecord.key());
		assertEquals("test-message", new String(consumedRecord.value(), StandardCharsets.UTF_8));

		kafkaServer.shutdown();
		zkClient.close();
		embeddedZookeeper.shutdown();
}}

Please provide support for the same and there should be proper documentation for the intergration
test with each release. 


> Integration test with embedded kafka not working
> ------------------------------------------------
>
>                 Key: KAFKA-6339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6339
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.2
>            Reporter: DHRUV BANSAL
>
> I am using Kafka version - 0.11.0.2
> Trying to write an integration test for one of the components I am writing over Kafka.
> Following code works fine with Kafka version 0.10.0.0 but not working with mentioned
version (0.11.0.2)
> {{
> // setup Zookeeper
> 		{{EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
> 		String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
> 		ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
> 		ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
> }}
> 		// setup Broker
> 		Properties brokerProps = new Properties();
> 		brokerProps.setProperty("zookeeper.connect", zkConnect);
> 		brokerProps.setProperty("broker.id", "0");
> 		brokerProps.setProperty("offsets.topic.replication.factor", "1");
> 		String kafka_log_path = Files.createTempDirectory("kafka-").toAbsolutePath().toString();
> 		System.out.println("kafka log path " + kafka_log_path);
> 		brokerProps.setProperty("log.dirs", kafka_log_path);
> 		brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
> 		KafkaConfig config = new KafkaConfig(brokerProps);
> 		Time mock = new MockTime();
> 		KafkaServer kafkaServer = TestUtils.createServer(config, mock);
> 		// create topic
> 		AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
> 		// setup producer
> 		Properties producerProps = new Properties();
> 		producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
> 		producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
> 		producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
> 		KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
> 		// setup consumer
> 		Properties consumerProps = new Properties();
> 		consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
> 		consumerProps.setProperty("group.id", "group0");
> 		consumerProps.setProperty("client.id", "consumer0");
> 		consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
> 		consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> 		consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts
from the beginning of
> 															// the topic
> 		KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
> 		consumer.subscribe(Arrays.asList(TOPIC));
> 		// send message
> 		ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42,
> 				"test-message".getBytes(StandardCharsets.UTF_8));
> 		Future<RecordMetadata> record = producer.send(data);
> 		RecordMetadata metadata = record.get();
> 		// starting consumer
> 		ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
> 		assertEquals(1, records.count());
> 		Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
> 		ConsumerRecord<Integer, byte[]> consumedRecord = recordIterator.next();
> 		System.out.printf("offset = %d, key = %s, value = %s", consumedRecord.offset(), consumedRecord.key(),
> 				consumedRecord.value());
> 		assertEquals(42, (int) consumedRecord.key());
> 		assertEquals("test-message", new String(consumedRecord.value(), StandardCharsets.UTF_8));
> 		kafkaServer.shutdown();
> 		zkClient.close();
> 		embeddedZookeeper.shutdown();
> }}
> Please provide support for the same and there should be proper documentation for the
intergration test with each release. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message