metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [01/10] metron git commit: METRON-1573 Enhance KAFKA_* functions to return partition and offset details (nickwallen) closes apache/metron#1030
Date Fri, 22 Jun 2018 15:43:56 GMT
Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1416-upgrade-solr 975923e8d -> 1767727a7


METRON-1573 Enhance KAFKA_* functions to return partition and offset details (nickwallen)
closes apache/metron#1030


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/0bb35800
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/0bb35800
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/0bb35800

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 0bb358009f1823fd19682ddb18d00aefa1441bf6
Parents: b081e80
Author: nickwallen <nick@nickallen.org>
Authored: Mon Jun 18 11:29:53 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Mon Jun 18 11:29:53 2018 -0400

----------------------------------------------------------------------
 .../metron/management/KafkaFunctions.java       | 173 +++++++++++++++++--
 .../KafkaFunctionsIntegrationTest.java          | 165 +++++++++++++++++-
 2 files changed, 322 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/0bb35800/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index f256672..7c9c23f 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.management;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -30,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.metron.common.system.Clock;
-import org.apache.metron.profiler.client.stellar.Util;
 import org.apache.metron.stellar.common.LambdaExpression;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.common.utils.JSONUtils;
@@ -66,6 +66,7 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
  *  KAFKA_GET
  *  KAFKA_PUT
  *  KAFKA_TAIL
+ *  KAFKA_FIND
  *  KAFKA_PROPS
  */
 public class KafkaFunctions {
@@ -98,6 +99,30 @@ public class KafkaFunctions {
   public static final int DEFAULT_MAX_WAIT = 5000;
 
   /**
+   * The key for the global property that defines how a message is returned
+   * from the set of KAFKA functions.
+   *
+   * <p>simple - The result contains only the message value as a string.
+   * <p>rich - The result contains the message value, topic, partition, and offset.
+   */
+  public static final String MESSAGE_VIEW_PROPERTY = "stellar.kafka.message.view";
+
+  /**
+   * An acceptable value for the 'stellar.kafka.message.view' property. The result
+   * provided will contain only the message value as a string.
+   */
+  public static final String MESSAGE_VIEW_SIMPLE = "simple";
+
+  /**
+   * An acceptable value for the 'stellar.kafka.message.view' property.
+   *
+   * <p>Provides a view of each message with more detailed metadata beyond just the
+   * message value.  The result provided will contain the message value, topic, partition,
+   * and offset.
+   */
+  public static final String MESSAGE_VIEW_RICH = "rich";
+
+  /**
    * The default set of Kafka properties.
    */
   private static Properties defaultProperties = defaultKafkaProperties();
@@ -137,6 +162,12 @@ public class KafkaFunctions {
    *   KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" })
    *   }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -202,7 +233,8 @@ public class KafkaFunctions {
         while(messages.size() < count && wait < maxWait) {
 
           for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) {
-            messages.add(record.value());
+            Object viewOfMessage = render(record, properties);
+            messages.add(viewOfMessage);
           }
 
           // how long have we waited?
@@ -247,6 +279,12 @@ public class KafkaFunctions {
    *   KAFKA_TAIL('topic', 10)
    *   }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -312,7 +350,8 @@ public class KafkaFunctions {
         while(messages.size() < count && wait < maxWait) {
 
           for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) {
-            messages.add(record.value());
+            Object viewOfMessage = render(record, properties);
+            messages.add(viewOfMessage);
           }
 
           // how long have we waited?
@@ -357,6 +396,7 @@ public class KafkaFunctions {
    *  KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" })
    *  }
    * </pre>
+   *
    */
   @Stellar(
           namespace = "KAFKA",
@@ -394,9 +434,49 @@ public class KafkaFunctions {
 
       // send the messages
       Properties properties = buildKafkaProperties(overrides, context);
-      putMessages(topic, messages, properties);
+      List<RecordMetadata> records = putMessages(topic, messages, properties);
 
-      return null;
+      // render a view of the messages that were written for the user
+      Object view = render(records, properties);
+      return view;
+    }
+
+    /**
+     * Render a view of the {@link RecordMetadata} that resulted from writing
+     * messages to Kafka.
+     *
+     * @param records The record metadata.
+     * @param properties The properties.
+     * @return
+     */
+    private Object render(List<RecordMetadata> records, Properties properties) {
+
+      Object view;
+      if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) {
+
+        // build a 'rich' view of the messages that were written
+        List<Object> responses = new ArrayList<>();
+        for(RecordMetadata record: records) {
+
+          // render the 'rich' view of the record
+          Map<String, Object> richView = new HashMap<>();
+          richView.put("topic", record.topic());
+          richView.put("partition", record.partition());
+          richView.put("offset", record.offset());
+          richView.put("timestamp", record.timestamp());
+
+          responses.add(richView);
+        }
+
+        // the rich view is a list of maps containing metadata about how each message was
written
+        view = responses;
+
+      } else {
+
+        // otherwise, the view is simply a count of the number of messages written
+        view = CollectionUtils.size(records);
+      }
+      return view;
     }
 
     /**
@@ -407,9 +487,11 @@ public class KafkaFunctions {
      * @param topic The topic to send messages to.
      * @param messages The messages to send.
      * @param properties The properties to use with Kafka.
+     * @return Metadata about all the records written to Kafka.
      */
-    private void putMessages(String topic, List<String> messages, Properties properties)
{
+    private List<RecordMetadata> putMessages(String topic, List<String> messages,
Properties properties) {
       LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size());
+      List<RecordMetadata> records = new ArrayList<>();
       try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties))
{
 
         List<Future<RecordMetadata>> futures = new ArrayList<>();
@@ -422,11 +504,14 @@ public class KafkaFunctions {
 
         // wait for the sends to complete
         for(Future<RecordMetadata> future : futures) {
-          waitForResponse(future, properties);
+          RecordMetadata record = waitForResponse(future, properties);
+          records.add(record);
         }
 
         producer.flush();
       }
+
+      return records;
     }
 
     /**
@@ -434,19 +519,23 @@ public class KafkaFunctions {
      *
      * @param future The future for the message being sent.
      * @param properties The configuration properties.
-     * @return
+     * @return Metadata about the record that was written to Kafka.
      */
-    private void waitForResponse(Future<RecordMetadata> future, Properties properties)
{
+    private RecordMetadata waitForResponse(Future<RecordMetadata> future, Properties
properties) {
+      RecordMetadata record = null;
       int maxWait = getMaxWait(properties);
+
       try {
         // wait for the record and then render it for the user
-        RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS);
+        record = future.get(maxWait, TimeUnit.MILLISECONDS);
         LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}",
                 record.topic(), record.partition(), record.offset());
 
       } catch(TimeoutException | InterruptedException | ExecutionException e) {
         LOG.error("KAFKA_PUT message send failure", e);
       }
+
+      return record;
     }
 
     @Override
@@ -528,6 +617,12 @@ public class KafkaFunctions {
    * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10)
    * }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -601,7 +696,8 @@ public class KafkaFunctions {
 
             // only keep the message if the filter expression is satisfied
             if(isSatisfied(filter, record.value())) {
-              messages.add(record.value());
+              Object view = render(record, properties);
+              messages.add(view);
 
               // do we have enough messages already?
               if(messages.size() >= count) {
@@ -667,6 +763,41 @@ public class KafkaFunctions {
   }
 
   /**
+   * Renders the Kafka record into a view.
+   *
+   * <p>A user can customize the way in which a Kafka record is rendered by altering
+   * the "stellar.kafka.message.view" property.
+   *
+   * @param record The Kafka record to render.
+   * @param properties The properties which allows a user to customize the rendered view
of a record.
+   * @return
+   */
+  private static Object render(ConsumerRecord<String, String> record, Properties properties)
{
+    LOG.debug("Render message; topic={}, partition={}, offset={}",
+            record.topic(), record.partition(), record.offset());
+
+    Object result;
+    if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) {
+      // build the detailed view of the record
+      Map<String, Object> view = new HashMap<>();
+      view.put("value", record.value());
+      view.put("topic", record.topic());
+      view.put("partition", record.partition());
+      view.put("offset", record.offset());
+      view.put("timestamp", record.timestamp());
+      view.put("key", record.key());
+
+      result = view;
+
+    } else {
+      // default to the simple view
+      result = record.value();
+    }
+
+    return result;
+  }
+
+  /**
    * Manually assigns all partitions in a topic to a consumer
    *
    * @param topic The topic whose partitions will be assigned.
@@ -756,6 +887,23 @@ public class KafkaFunctions {
   }
 
   /**
+   * Determines how Kafka messages should be rendered for the user.
+   *
+   * @param properties The properties.
+   * @return How the Kafka messages should be rendered.
+   */
+  private static String getMessageView(Properties properties) {
+    // defaults to the simple view
+    String messageView = MESSAGE_VIEW_SIMPLE;
+
+    if(properties.containsKey(MESSAGE_VIEW_PROPERTY)) {
+      messageView = ConversionUtils.convert(properties.get(MESSAGE_VIEW_PROPERTY), String.class);
+    }
+
+    return messageView;
+  }
+
+  /**
    * Defines a minimal set of default parameters that can be overridden
    * via the global properties.
    */
@@ -792,6 +940,9 @@ public class KafkaFunctions {
     // set the default poll timeout
     properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT);
 
+    // set the default message view
+    properties.put(MESSAGE_VIEW_PROPERTY, MESSAGE_VIEW_SIMPLE);
+
     return properties;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/0bb35800/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index d82bb37..5e045ad 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -48,6 +48,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -153,7 +155,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
     variables.put("topic", topicName);
 
     // put a message onto the topic
-    run("KAFKA_PUT(topic, [message1])");
+    assertEquals(1, run("KAFKA_PUT(topic, [message1])"));
+
+    // validate the message in the topic
+    assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic)"));
+  }
+
+  /**
+   * KAFKA_PUT should be able to write multiple message to a topic.
+   */
+  @Test
+  public void testKafkaPutMultipleMessages() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic
+    assertEquals(2, run("KAFKA_PUT(topic, [message1, message2])"));
+
+    // validate the message in the topic
+    List<String> expected = new ArrayList<String>() {{
+      add(message1);
+      add(message2);
+    }};
+    assertEquals(expected, run("KAFKA_GET(topic, 2)"));
+  }
+
+  /**
+   * KAFKA_PUT should be able to write a message passed as a String, rather than a List.
+   */
+  @Test
+  public void testKafkaPutOneMessagePassedAsString() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic - the message is just a string, not a list
+    run("KAFKA_PUT(topic, message1)");
 
     // get a message from the topic
     Object actual = run("KAFKA_GET(topic)");
@@ -166,7 +206,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
    * KAFKA_PUT should be able to write a message passed as a String, rather than a List.
    */
   @Test
-  public void testKafkaPutOneMessagePassedAsString() {
+  public void testKafkaPutWithRichView() {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic - the message is just a string, not a list
+    Object actual = run("KAFKA_PUT(topic, message1)");
+
+    // validate
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(0L, view.get("offset"));
+    assertNotNull(view.get("timestamp"));
+
+  }
+
+  /**
+   * KAFKA_GET should allow a user to see a detailed view of each Kafka record.
+   */
+  @Test
+  public void testKafkaGetWithRichView() {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
 
     // use a unique topic name for this test
     final String topicName = testName.getMethodName();
@@ -179,7 +252,17 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
     Object actual = run("KAFKA_GET(topic)");
 
     // validate
-    assertEquals(Collections.singletonList(message1), actual);
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertEquals(0L, view.get("offset"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message1, view.get("value"));
   }
 
   /**
@@ -300,6 +383,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
   }
 
   /**
+   * KAFKA_TAIL should allow a user to see a rich view of each Kafka record.
+   */
+  @Test
+  public void testKafkaTailWithRichView() throws Exception {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic; KAFKA tail should NOT retrieve these
+    run("KAFKA_PUT(topic, [message2, message2, message2])");
+
+    // get a message from the topic; will block until messages arrive
+    Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
+
+    // wait for KAFKA_TAIL to complete
+    Object actual = tailFuture.get(10, TimeUnit.SECONDS);
+
+    // validate
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message1, view.get("value"));
+    assertNotNull(view.get("offset"));
+  }
+
+  /**
    * KAFKA_PROPS should return the set of properties used to configure the Kafka consumer
    *
    * The properties used for the KAFKA_* functions are calculated by compiling the default,
global and user
@@ -339,7 +461,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
     Map<String, String> properties = (Map<String, String>) run(expression);
     assertEquals(expected, properties.get(overriddenKey));
   }
-  
+
   /**
    * KAFKA_FIND should only return messages that satisfy a filter expression.
    */
@@ -385,6 +507,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
   }
 
   /**
+   * KAFKA_FIND should allow a user to see a detailed view of each Kafka record.
+   */
+  @Test
+  public void testKafkaFindWithRichView() throws Exception {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // find all messages satisfying the filter expression
+    Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m)
== 23)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
+
+    // validate
+    Object actual = future.get(10, TimeUnit.SECONDS);
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertNotNull(view.get("offset"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message2, view.get("value"));
+  }
+
+  /**
    * KAFKA_FIND should return no more messages than its limit.
    */
   @Test
@@ -491,4 +647,3 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest
{
     }
   }
 }
-


Mime
View raw message