karaf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject karaf-decanter git commit: KARAF-4430 - Introduce request.timeout.ms support and add a callback to deal with timeout
Date Wed, 23 Mar 2016 09:56:58 GMT
Repository: karaf-decanter
Updated Branches:
  refs/heads/master 28a1f8e3e -> c31758f2a


KARAF-4430 - Introduce request.timeout.ms support and add a callback to deal with timeout


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/c31758f2
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/c31758f2
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/c31758f2

Branch: refs/heads/master
Commit: c31758f2a7749c879669e6375d60564e8f326691
Parents: 28a1f8e
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Wed Mar 23 10:56:22 2016 +0100
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Wed Mar 23 10:56:22 2016 +0100

----------------------------------------------------------------------
 .../org.apache.karaf.decanter.appender.kafka.cfg    |  3 +++
 .../decanter/appender/kafka/KafkaAppender.java      | 16 +++++++++++++++-
 manual/src/main/asciidoc/user-guide/appenders.adoc  |  3 +++
 3 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c31758f2/appender/kafka/src/main/cfg/org.apache.karaf.decanter.appender.kafka.cfg
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/cfg/org.apache.karaf.decanter.appender.kafka.cfg b/appender/kafka/src/main/cfg/org.apache.karaf.decanter.appender.kafka.cfg
index ee606d0..5821bc0 100644
--- a/appender/kafka/src/main/cfg/org.apache.karaf.decanter.appender.kafka.cfg
+++ b/appender/kafka/src/main/cfg/org.apache.karaf.decanter.appender.kafka.cfg
@@ -33,6 +33,9 @@
 # Serializer class for value that implements the Serializer interface.
 # value.serializer=org.apache.kafka.common.serialization.StringSerializer
 
+# Producer request timeout
+# request.timeout.ms=5000
+
 # Name of the topic
 # topic=decanter
 

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c31758f2/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
index a4e4e09..6c7a31c 100644
--- a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
@@ -19,8 +19,11 @@ package org.apache.karaf.decanter.appender.kafka;
 import java.util.Dictionary;
 import java.util.Properties;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -77,6 +80,9 @@ public class KafkaAppender implements EventHandler {
         String keySerializer = getValue(config, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("key.serializer", keySerializer);
 
+        String requestTimeoutMs = getValue(config, "request.timeout.ms", "5000");
+        properties.put("request.timeout.ms", requestTimeoutMs);
+
         String valueSerializer = getValue(config, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("value.serializer", valueSerializer);
 
@@ -146,7 +152,15 @@ public class KafkaAppender implements EventHandler {
         try {
             String type = (String)event.getProperty("type");
             String data = marshaller.marshal(event);
-            producer.send(new ProducerRecord<>(topic, type, data));
+            producer.send(new ProducerRecord<>(topic, type, data), new Callback() {
+                @Override
+                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+                    if (e != null) {
+                        LOGGER.warn("Can't send event to Kafka broker", e);
+                    }
+                }
+            });
+            producer.flush();
         } catch (RuntimeException e) {
             LOGGER.warn("Error sending event to kafka", e);
         }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c31758f2/manual/src/main/asciidoc/user-guide/appenders.adoc
----------------------------------------------------------------------
diff --git a/manual/src/main/asciidoc/user-guide/appenders.adoc b/manual/src/main/asciidoc/user-guide/appenders.adoc
index 86fe6df..f43a26c 100644
--- a/manual/src/main/asciidoc/user-guide/appenders.adoc
+++ b/manual/src/main/asciidoc/user-guide/appenders.adoc
@@ -931,6 +931,9 @@ This feature installs a default `etc/org.apache.karaf.decanter.appender.kafka.cf
 # Serializer class for value that implements the Serializer interface.
 # value.serializer=org.apache.kafka.common.serialization.StringSerializer
 
+# Producer request timeout
+# request.timeout.ms=5000
+
 # Name of the topic
 # topic=decanter
 


Mime
View raw message