metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject metron git commit: METRON-1572 Enhance KAFKA_PUT function (nickwallen) closes apache/metron#1024
Date Fri, 08 Jun 2018 12:39:46 GMT
Repository: metron
Updated Branches:
  refs/heads/master ae1d3eb9a -> 40796c06a


METRON-1572 Enhance KAFKA_PUT function (nickwallen) closes apache/metron#1024


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/40796c06
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/40796c06
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/40796c06

Branch: refs/heads/master
Commit: 40796c06ad96ae45dd853925fbae8c26509f6c2f
Parents: ae1d3eb
Author: nickwallen <nick@nickallen.org>
Authored: Fri Jun 8 08:39:20 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Fri Jun 8 08:39:20 2018 -0400

----------------------------------------------------------------------
 .../metron/management/KafkaFunctions.java       | 87 +++++++++++++++-----
 .../KafkaFunctionsIntegrationTest.java          | 21 +++++
 2 files changed, 89 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/40796c06/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 316e19d..a0c92eb 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -37,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -46,6 +47,9 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
@@ -332,18 +336,26 @@ public class KafkaFunctions {
   /**
    * KAFKA_PUT
    *
-   * Sends messages to a Kafka topic.
+   * <p>Sends messages to a Kafka topic.
    *
-   * Example: Put two messages on the topic 'topic'.
+   * <p>Example: Put two messages on the topic 'topic'.
+   * <pre>
+   *  {@code
    *  KAFKA_PUT('topic', ["message1", "message2"])
+   *  }
+   * </pre>
    *
-   * Example: Put a message on a topic and also define an alternative Kafka broker.
+   * <p>Example: Put a message on a topic and also define an alternative Kafka broker.
+   * <pre>
+   *  {@code
    *  KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" })
+   *  }
+   * </pre>
    */
   @Stellar(
           namespace = "KAFKA",
           name = "PUT",
-          description = "Sends messages to a Kafka topic.",
+          description = "Sends messages to a Kafka topic. ",
           params = {
                   "topic - The name of the Kafka topic.",
                   "messages - A list of messages to write.",
@@ -355,45 +367,82 @@ public class KafkaFunctions {
 
     @Override
     public Object apply(List<Object> args, Context context) throws ParseException {
-
       String topic = ConversionUtils.convert(args.get(0), String.class);
-      List<String> messages = ConversionUtils.convert(args.get(1), List.class);
 
-      // build the properties for kafka
+      List<String> messages;
+      if(args.get(1) instanceof String) {
+        // a single message needs sent
+        String msg = ConversionUtils.convert(args.get(1), String.class);
+        messages = Collections.singletonList(msg);
+
+      } else {
+        // a list of messages; all need sent
+        messages = ConversionUtils.convert(args.get(1), List.class);
+      }
+
+      // are there any overrides?
       Map<String, String> overrides = new HashMap<>();
       if(args.size() > 2) {
         overrides = ConversionUtils.convert(args.get(2), Map.class);
       }
-      Properties properties = buildKafkaProperties(overrides, context);
 
       // send the messages
-      try {
-        send(topic, messages, properties);
-
-      } catch(InterruptedException | ExecutionException e) {
-        throw new ParseException(e.getMessage(), e);
-      }
+      Properties properties = buildKafkaProperties(overrides, context);
+      putMessages(topic, messages, properties);
 
       return null;
     }
 
     /**
-     * Send each message synchronously.
+     * Put messages to a Kafka topic.
+     *
+     * <p>Sends each message synchronously.
+     *
      * @param topic The topic to send messages to.
      * @param messages The messages to send.
      * @param properties The properties to use with Kafka.
      */
-    private void send(String topic, List<String> messages, Properties properties) throws
InterruptedException, ExecutionException {
+    private void putMessages(String topic, List<String> messages, Properties properties)
{
+      LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size());
       try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties))
{
 
-        // send each message synchronously, hence the get()
+        List<Future<RecordMetadata>> futures = new ArrayList<>();
+
+        // send each message
         for(String msg : messages) {
-          producer.send(new ProducerRecord<>(topic, msg)).get();
+          Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic,
msg));
+          futures.add(future);
+        }
+
+        // wait for the sends to complete
+        for(Future<RecordMetadata> future : futures) {
+          waitForResponse(future, properties);
         }
+
         producer.flush();
       }
     }
 
+    /**
+     * Wait for response to the message being sent.
+     *
+     * @param future The future for the message being sent.
+     * @param properties The configuration properties.
+     * @return
+     */
+    private void waitForResponse(Future<RecordMetadata> future, Properties properties)
{
+      int maxWait = getMaxWait(properties);
+      try {
+        // wait for the record and then render it for the user
+        RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS);
+        LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}",
+                record.topic(), record.partition(), record.offset());
+
+      } catch(TimeoutException | InterruptedException | ExecutionException e) {
+        LOG.error("KAFKA_PUT message send failure", e);
+      }
+    }
+
     @Override
     public void initialize(Context context) {
       // no initialization required

http://git-wip-us.apache.org/repos/asf/metron/blob/40796c06/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index 74c6705..ad45b52 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -161,6 +161,26 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
   }
 
   /**
+   * KAFKA_PUT should be able to write a message passed as a String, rather than a List.
+   */
+  @Test
+  public void testKafkaPutOneMessagePassedAsString() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic - the message is just a string, not a list
+    run("KAFKA_PUT(topic, message1)");
+
+    // get a message from the topic
+    Object actual = run("KAFKA_GET(topic)");
+
+    // validate
+    assertEquals(Collections.singletonList(message1), actual);
+  }
+
+  /**
    * KAFKA_PUT should be able to write multiple messages passed as a List.
    * KAFKA_GET should be able to read multiple messages at once.
    */
@@ -373,3 +393,4 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
     }
   }
 }
+


Mime
View raw message