Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF0E217C5B for ; Sun, 19 Apr 2015 14:47:21 +0000 (UTC) Received: (qmail 531 invoked by uid 500); 19 Apr 2015 14:47:21 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 456 invoked by uid 500); 19 Apr 2015 14:47:21 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 447 invoked by uid 99); 19 Apr 2015 14:47:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 19 Apr 2015 14:47:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1451BE0324; Sun, 19 Apr 2015 14:47:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junrao@apache.org To: commits@kafka.apache.org Message-Id: <9674922c7da646acb50b29c0226bc8ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: kafka-1982; change kafka.examples.Producer to use the new java producer; patched by Ashish Singh; reviewed by Gwen Shapira, Mayuresh Gharat and Jun Rao Date: Sun, 19 Apr 2015 14:47:21 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 185eb9b59 -> 5408931a2 kafka-1982; change kafka.examples.Producer to use the new java producer; patched by Ashish Singh; reviewed by Gwen Shapira, Mayuresh Gharat and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5408931a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5408931a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5408931a Branch: refs/heads/trunk Commit: 5408931a2955adf8cb9d604d17dc9b5bd47b4d57 Parents: 185eb9b Author: Ashish Singh Authored: Sun Apr 19 07:46:58 2015 -0700 Committer: Jun Rao Committed: Sun Apr 19 07:46:58 2015 -0700 ---------------------------------------------------------------------- .../common/serialization/SerializationTest.java | 20 +++++ examples/README | 5 +- .../src/main/java/kafka/examples/Consumer.java | 15 ++-- .../examples/KafkaConsumerProducerDemo.java | 3 +- .../src/main/java/kafka/examples/Producer.java | 83 ++++++++++++++++---- .../java/kafka/examples/SimpleConsumerDemo.java | 4 +- 6 files changed, 105 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index f5cd61c..383bf48 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -54,6 +54,26 @@ public class SerializationTest { } } + @Test + public void testIntegerSerializer() { + Integer[] integers = new Integer[]{ + 423412424, + -41243432 + }; + String mytopic = "testTopic"; + + Serializer serializer = new IntegerSerializer(); + Deserializer deserializer = new IntegerDeserializer(); + + for (Integer integer : integers) { + assertEquals("Should get the original integer after serialization and deserialization", + integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic, integer))); + } + + assertEquals("Should support null in serialization and deserialization", + null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + } + private SerDeser getStringSerDeser(String encoder) { Map serializerConfigs = new HashMap(); serializerConfigs.put("key.serializer.encoding", encoder); http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/README ---------------------------------------------------------------------- diff --git a/examples/README b/examples/README index 53db696..f6e3410 100644 --- a/examples/README +++ b/examples/README @@ -3,6 +3,7 @@ This directory contains examples of client code that uses kafka. To run the demo: 1. Start Zookeeper and the Kafka server - 2. For simple consumer demo, run bin/java-simple-consumer-demo.sh - 3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh + 2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh` + 3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` + 4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/Consumer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 13135b9..8af64d8 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -17,14 +17,15 @@ package kafka.examples; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; public class Consumer extends Thread @@ -54,11 +55,13 @@ public class Consumer extends Thread public void run() { Map topicCountMap = new HashMap(); - topicCountMap.put(topic, new Integer(1)); + topicCountMap.put(topic, 1); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaStream stream = consumerMap.get(topic).get(0); - ConsumerIterator it = stream.iterator(); - while(it.hasNext()) - System.out.println(new String(it.next().message())); + KafkaStream stream = consumerMap.get(topic).get(0); + for (MessageAndMetadata messageAndMetadata : stream) { + System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() + + ", " + + "" + new String(messageAndMetadata.message()) + ")"); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 1239394..e96991a 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -20,7 +20,8 @@ public class KafkaConsumerProducerDemo implements KafkaProperties { public static void main(String[] args) { - Producer producerThread = new Producer(KafkaProperties.topic); + final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true; + Producer producerThread = new Producer(KafkaProperties.topic, isAsync); producerThread.start(); Consumer consumerThread = new Consumer(KafkaProperties.topic); http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/Producer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..ccc9925 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,33 +18,88 @@ package kafka.examples; import java.util.Properties; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { - private final kafka.javaapi.producer.Producer producer; + private final KafkaProducer producer; private final String topic; - private final Properties props = new Properties(); + private final Boolean isAsync; - public Producer(String topic) + public Producer(String topic, Boolean isAsync) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "DemoProducer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(props); this.topic = topic; + this.isAsync = isAsync; } - + public void run() { int messageNo = 1; while(true) { - String messageStr = new String("Message_" + messageNo); - producer.send(new KeyedMessage(topic, messageStr)); - messageNo++; + String messageStr = "Message_" + messageNo; + long startTime = System.currentTimeMillis(); + if (isAsync) { // Send asynchronously + producer.send(new ProducerRecord(topic, + messageNo, + messageStr), new DemoCallBack(startTime, messageNo, messageStr)); + } else { // Send synchronously + try { + producer.send(new ProducerRecord(topic, + messageNo, + messageStr)).get(); + System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + ++messageNo; } } +} + +class DemoCallBack implements Callback { + + private long startTime; + private int key; + private String message; + public DemoCallBack(long startTime, int key, String message) { + this.startTime = startTime; + this.key = key; + this.message = message; + } + + /** + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be + * non-null. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error + * occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. + */ + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (metadata != null) { + System.out.println( + "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + + "), " + + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); + } else { + exception.printStackTrace(); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index 0d66fe5..e5096f0 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -42,9 +42,9 @@ public class SimpleConsumerDemo { } private static void generateData() { - Producer producer2 = new Producer(KafkaProperties.topic2); + Producer producer2 = new Producer(KafkaProperties.topic2, false); producer2.start(); - Producer producer3 = new Producer(KafkaProperties.topic3); + Producer producer3 = new Producer(KafkaProperties.topic3, false); producer3.start(); try { Thread.sleep(1000);