kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1982; change kafka.examples.Producer to use the new java producer; patched by Ashish Singh; reviewed by Gwen Shapira, Mayuresh Gharat and Jun Rao
Date Sun, 19 Apr 2015 14:47:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 185eb9b59 -> 5408931a2


kafka-1982; change kafka.examples.Producer to use the new java producer; patched by Ashish
Singh; reviewed by Gwen Shapira, Mayuresh Gharat and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5408931a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5408931a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5408931a

Branch: refs/heads/trunk
Commit: 5408931a2955adf8cb9d604d17dc9b5bd47b4d57
Parents: 185eb9b
Author: Ashish Singh <asingh@cloudera.com>
Authored: Sun Apr 19 07:46:58 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Apr 19 07:46:58 2015 -0700

----------------------------------------------------------------------
 .../common/serialization/SerializationTest.java | 20 +++++
 examples/README                                 |  5 +-
 .../src/main/java/kafka/examples/Consumer.java  | 15 ++--
 .../examples/KafkaConsumerProducerDemo.java     |  3 +-
 .../src/main/java/kafka/examples/Producer.java  | 83 ++++++++++++++++----
 .../java/kafka/examples/SimpleConsumerDemo.java |  4 +-
 6 files changed, 105 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index f5cd61c..383bf48 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -54,6 +54,26 @@ public class SerializationTest {
         }
     }
 
+    @Test
+    public void testIntegerSerializer() {
+        Integer[] integers = new Integer[]{
+            423412424,
+            -41243432
+        };
+        String mytopic = "testTopic";
+
+        Serializer<Integer> serializer = new IntegerSerializer();
+        Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+        for (Integer integer : integers) {
+            assertEquals("Should get the original integer after serialization and deserialization",
+                    integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic,
integer)));
+        }
+
+        assertEquals("Should support null in serialization and deserialization",
+                null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+    }
+
     private SerDeser<String> getStringSerDeser(String encoder) {
         Map<String, Object> serializerConfigs = new HashMap<String, Object>();
         serializerConfigs.put("key.serializer.encoding", encoder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/README
----------------------------------------------------------------------
diff --git a/examples/README b/examples/README
index 53db696..f6e3410 100644
--- a/examples/README
+++ b/examples/README
@@ -3,6 +3,7 @@ This directory contains examples of client code that uses kafka.
 To run the demo: 
 
    1. Start Zookeeper and the Kafka server
-   2. For simple consumer demo, run bin/java-simple-consumer-demo.sh
-   3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh
+   2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
+   3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
+   4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 13135b9..8af64d8 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -17,14 +17,15 @@
 package kafka.examples;
 
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
 
 
 public class Consumer extends Thread
@@ -54,11 +55,13 @@ public class Consumer extends Thread
  
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    topicCountMap.put(topic, new Integer(1));
+    topicCountMap.put(topic, 1);
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
-    ConsumerIterator<byte[], byte[]> it = stream.iterator();
-    while(it.hasNext())
-      System.out.println(new String(it.next().message()));
+    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+    for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
+      System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt()
+
+          ", " +
+          "" + new String(messageAndMetadata.message()) + ")");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 1239394..e96991a 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -20,7 +20,8 @@ public class KafkaConsumerProducerDemo implements KafkaProperties
 {
   public static void main(String[] args)
   {
-    Producer producerThread = new Producer(KafkaProperties.topic);
+    final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync")
: true;
+    Producer producerThread = new Producer(KafkaProperties.topic, isAsync);
     producerThread.start();
 
     Consumer consumerThread = new Consumer(KafkaProperties.topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/Producer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index 96e9893..ccc9925 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -18,33 +18,88 @@ package kafka.examples;
 
 
 import java.util.Properties;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
 public class Producer extends Thread
 {
-  private final kafka.javaapi.producer.Producer<Integer, String> producer;
+  private final KafkaProducer<Integer, String> producer;
   private final String topic;
-  private final Properties props = new Properties();
+  private final Boolean isAsync;
 
-  public Producer(String topic)
+  public Producer(String topic, Boolean isAsync)
   {
-    props.put("serializer.class", "kafka.serializer.StringEncoder");
-    props.put("metadata.broker.list", "localhost:9092");
-    // Use random partitioner. Don't need the key type. Just set it to Integer.
-    // The message is of type String.
-    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
+    Properties props = new Properties();
+    props.put("bootstrap.servers", "localhost:9092");
+    props.put("client.id", "DemoProducer");
+    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    producer = new KafkaProducer<Integer, String>(props);
     this.topic = topic;
+    this.isAsync = isAsync;
   }
-  
+
   public void run() {
     int messageNo = 1;
     while(true)
     {
-      String messageStr = new String("Message_" + messageNo);
-      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
-      messageNo++;
+      String messageStr = "Message_" + messageNo;
+      long startTime = System.currentTimeMillis();
+      if (isAsync) { // Send asynchronously
+        producer.send(new ProducerRecord<Integer, String>(topic,
+            messageNo,
+            messageStr), new DemoCallBack(startTime, messageNo, messageStr));
+      } else { // Send synchronously
+        try {
+          producer.send(new ProducerRecord<Integer, String>(topic,
+              messageNo,
+              messageStr)).get();
+          System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (ExecutionException e) {
+          e.printStackTrace();
+        }
+      }
+      ++messageNo;
     }
   }
+}
+
+class DemoCallBack implements Callback {
+
+  private long startTime;
+  private int key;
+  private String message;
 
+  public DemoCallBack(long startTime, int key, String message) {
+    this.startTime = startTime;
+    this.key = key;
+    this.message = message;
+  }
+
+  /**
+   * A callback method the user can implement to provide asynchronous handling of request
completion. This method will
+   * be called when the record sent to the server has been acknowledged. Exactly one of the
arguments will be
+   * non-null.
+   *
+   * @param metadata  The metadata for the record that was sent (i.e. the partition and offset).
Null if an error
+   *                  occurred.
+   * @param exception The exception thrown during processing of this record. Null if no error
occurred.
+   */
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    long elapsedTime = System.currentTimeMillis() - startTime;
+    if (metadata != null) {
+      System.out.println(
+          "message(" + key + ", " + message + ") sent to partition(" + metadata.partition()
+
+              "), " +
+              "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
+    } else {
+      exception.printStackTrace();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5408931a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index 0d66fe5..e5096f0 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -42,9 +42,9 @@ public class SimpleConsumerDemo {
   }
 
   private static void generateData() {
-    Producer producer2 = new Producer(KafkaProperties.topic2);
+    Producer producer2 = new Producer(KafkaProperties.topic2, false);
     producer2.start();
-    Producer producer3 = new Producer(KafkaProperties.topic3);
+    Producer producer3 = new Producer(KafkaProperties.topic3, false);
     producer3.start();
     try {
       Thread.sleep(1000);


Mime
View raw message