metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmiklav...@apache.org
Subject [04/50] [abbrv] metron git commit: METRON-1594: KafkaWriter is asynchronous and may lose data on node failure (mmiklavc via mmiklavc) closes apache/metron#1045
Date Mon, 11 Jun 2018 15:45:10 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
index 3087dd9..919654c 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml
@@ -134,6 +134,7 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "INDEXING"
         configMethods:
             -   name: "withBulkMessageWriter"
                 args:
@@ -147,8 +148,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "INDEXING"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "kafkaWriter"
 

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
index 429ba45..9aa2b24 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml
@@ -106,6 +106,7 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "INDEXING"
         configMethods:
             -   name: "withBulkMessageWriter"
                 args:
@@ -119,8 +120,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "INDEXING"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "kafkaWriter"
 

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index e8b2896..8254baf 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -161,7 +161,11 @@ Example Stellar Filter which includes messages which contain a the `field1`
fiel
 then it is assumed to be a regex and will match any topic matching the pattern (e.g. `/bro.*/`
would match `bro_cust0`, `bro_cust1` and `bro_cust2`)
 * `readMetadata` : Boolean indicating whether to read metadata or not (`false` by default).
 See below for a discussion about metadata.
 * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not
(`false` by default).  See below for a discussion about metadata.
-* `parserConfig` : A JSON Map representing the parser implementation specific configuration.
+* `parserConfig` : A JSON Map representing the parser implementation specific configuration.
Also include batch sizing and timeout for writer configuration here.
+  * `batchSize` : Integer indicating number of records to batch together before sending to
the writer. (default to `15`)
+  * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has
not been met.  Optional.
+    If unspecified, or set to `0`, it defaults to a system-determined duration which is a
fraction of the Storm
+    parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables
batching.
 * `fieldTransformations` : An array of complex objects representing the transformations to
be done on the message generated from the parser before writing out to the kafka topic.
 * `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden
on the command line.
 * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden
on the command line.

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index dd59355..0e9b48f 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -19,6 +19,7 @@ package org.apache.metron.parsers.bolt;
 
 import static org.apache.metron.common.Constants.METADATA_PREFIX;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
@@ -30,15 +31,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
-
-import com.github.benmanes.caffeine.cache.Cache;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
 import org.apache.metron.common.configuration.FieldTransformer;
 import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
@@ -50,11 +51,15 @@ import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.metron.writer.bolt.BatchTimeoutHelper;
+import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +76,10 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
   private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
   private transient Cache<CachingStellarProcessor.Key, Object> cache;
+  private int requestedTickFreqSecs;
+  private int defaultBatchTimeout;
+  private int batchTimeoutDivisor = 1;
+
   public ParserBolt( String zookeeperUrl
                    , String sensorType
                    , MessageParser<JSONObject> parser
@@ -88,10 +97,85 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
     return this;
   }
 
+  /**
+   * If this ParserBolt is in a topology where it is daisy-chained with
+   * other queuing Writers, then the max amount of time it takes for a tuple
+   * to clear the whole topology is the sum of all the batchTimeouts for all the
+   * daisy-chained Writers.  In the common case where each Writer is using the default
+   * batchTimeout, it is then necessary to divide that batchTimeout by the number of
+   * daisy-chained Writers.  There are no examples of daisy-chained batching Writers
+   * in the current Metron topologies, but the feature is available as a "fluent"-style
+   * mutator if needed.  It would be used in the parser topology builder.
+   * Default value, if not otherwise set, is 1.
+   *
+   * If non-default batchTimeouts are configured for some components, the administrator
+   * may want to take this behavior into account.
+   *
+   * @param batchTimeoutDivisor
+   * @return BulkMessageWriterBolt
+   */
+  public ParserBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) {
+    if (batchTimeoutDivisor <= 0) {
+      throw new IllegalArgumentException(String.format("batchTimeoutDivisor must be positive.
Value provided was %s", batchTimeoutDivisor));
+    }
+    this.batchTimeoutDivisor = batchTimeoutDivisor;
+    return this;
+  }
+
+  /**
+   * Used only for unit testing
+   * @param defaultBatchTimeout
+   */
+  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
+    this.defaultBatchTimeout = defaultBatchTimeout;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public int getDefaultBatchTimeout() {
+    return defaultBatchTimeout;
+  }
+
   public MessageParser<JSONObject> getParser() {
     return parser;
   }
 
+  /**
+   * This method is called by TopologyBuilder.createTopology() to obtain topology and
+   * bolt specific configuration parameters.  We use it primarily to configure how often
+   * a tick tuple will be sent to our bolt.
+   * @return conf topology and bolt specific configuration parameters
+   */
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    // This is called long before prepare(), so do some of the same stuff as prepare() does,
+    // to get the valid WriterConfiguration.  But don't store any non-serializable objects,
+    // else Storm will throw a runtime error.
+    Function<WriterConfiguration, WriterConfiguration> configurationXform;
+    if(writer.isWriterToBulkWriter()) {
+      configurationXform = WriterToBulkWriter.TRANSFORMATION;
+    }
+    else {
+      configurationXform = x -> x;
+    }
+    WriterConfiguration writerconf = configurationXform
+        .apply(getConfigurationStrategy().createWriterConfig(writer.getBulkMessageWriter(),
getConfigurations()));
+
+    BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts,
batchTimeoutDivisor);
+    this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
+    //And while we've got BatchTimeoutHelper handy, capture the defaultBatchTimeout for writerComponent.
+    this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+
+    Map<String, Object> conf = super.getComponentConfiguration();
+    if (conf == null) {
+      conf = new HashMap<String, Object>();
+    }
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs);
+    LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " + Integer.toString(requestedTickFreqSecs));
+    return conf;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
@@ -114,6 +198,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
     parser.init();
 
     writer.init(stormConf, context, collector, getConfigurations());
+    if (defaultBatchTimeout == 0) {
+      //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
+      //probably because we are in a unit test scenario.  So calculate it here.
+      WriterConfiguration writerConfig = getConfigurationStrategy()
+          .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
+      BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerConfig::getAllConfiguredTimeouts,
batchTimeoutDivisor);
+      defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+    }
+    writer.setDefaultBatchTimeout(defaultBatchTimeout);
 
     SensorParserConfig config = getSensorParserConfig();
     if(config != null) {
@@ -173,6 +266,17 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+    if (TupleUtils.isTick(tuple)) {
+      try {
+        writer.flush(getConfigurations(), messageGetStrategy);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "This should have been caught in the writerHandler.  If you see this, file a
JIRA", e);
+      } finally {
+        collector.ack(tuple);
+      }
+      return;
+    }
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     SensorParserConfig sensorParserConfig = getSensorParserConfig();
     try {

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 2192942..3916dea 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -18,31 +18,37 @@
 
 package org.apache.metron.parsers.bolt;
 
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Function;
 import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.ConfigurationStrategy;
+import org.apache.metron.common.configuration.writer.ConfigurationsStrategies;
 import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WriterHandler implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private BulkMessageWriter<JSONObject> messageWriter;
   private transient BulkWriterComponent<JSONObject> writerComponent;
   private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
   private boolean isBulk = false;
+  private ConfigurationStrategy configStrategy = ConfigurationsStrategies.PARSERS;
+
   public WriterHandler(MessageWriter<JSONObject> writer) {
     isBulk = false;
     messageWriter = new WriterToBulkWriter<>(writer);
@@ -53,17 +59,24 @@ public class WriterHandler implements Serializable {
     messageWriter = writer;
   }
 
-
   public boolean handleAck() {
     return isBulk;
   }
 
+  public boolean isWriterToBulkWriter() {
+    return messageWriter instanceof  WriterToBulkWriter;
+  }
+
+  public BulkMessageWriter getBulkMessageWriter() {
+    return messageWriter;
+  }
+
   public void init(Map stormConf, TopologyContext topologyContext, OutputCollector collector,
ParserConfigurations configurations) {
     if(isBulk) {
-      writerTransformer = config -> new ParserWriterConfiguration(config);
+      writerTransformer = config -> configStrategy.createWriterConfig(messageWriter, config);
     }
     else {
-      writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
+      writerTransformer = config -> new SingleBatchConfigurationFacade(configStrategy.createWriterConfig(messageWriter,
config));
     }
     try {
       messageWriter.init(stormConf, topologyContext, writerTransformer.apply(configurations));
@@ -87,7 +100,29 @@ public class WriterHandler implements Serializable {
     writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations),
messageGetStrategy);
   }
 
+  public void flush(ParserConfigurations configurations, MessageGetStrategy messageGetStrategy)
+      throws Exception {
+    if (!(messageWriter instanceof WriterToBulkWriter)) {
+      //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
+      LOG.debug("Flushing message queues older than their batchTimeouts");
+      writerComponent.flushTimeouts(messageWriter, writerTransformer.apply(configurations),
+          messageGetStrategy);
+    }
+  }
+
   public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy)
{
     writerComponent.errorAll(sensorType, e, messageGetStrategy);
   }
+
+  /**
+   * Sets batch timeout on the underlying component
+   * @param defaultBatchTimeout
+   */
+  public void setDefaultBatchTimeout(int defaultBatchTimeout) {
+    if (writerComponent == null) {
+      throw new UnsupportedOperationException("Must call init prior to calling this method.");
+    }
+    writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 6439b2b..15ce735 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,46 +17,59 @@
  */
 package org.apache.metron.parsers.bolt;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.common.writer.BulkMessageWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.parsers.BasicParser;
-import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mock;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
 public class ParserBoltTest extends BaseBoltTest {
 
   @Mock
@@ -285,33 +298,6 @@ public class ParserBoltTest extends BaseBoltTest {
     parserBolt.execute(tuple);
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
-@Test
-public void testImplicitBatchOfOne() throws Exception {
-
-  String sensorType = "yaf";
-
-  ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
-    @Override
-    protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-      return ParserBoltTest.createUpdater();
-    }
-  };
-
-  parserBolt.setCuratorFramework(client);
-  parserBolt.setZKCache(cache);
-  parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-  verify(parser, times(1)).init();
-  verify(batchWriter, times(1)).init(any(), any(), any());
-  when(parser.validate(any())).thenReturn(true);
-  when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
-  when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-  BulkWriterResponse response = new BulkWriterResponse();
-  response.addSuccess(t1);
-  when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)),
any())).thenReturn(response);
-  parserBolt.withMessageFilter(filter);
-  parserBolt.execute(t1);
-  verify(outputCollector, times(1)).ack(t1);
-}
 
   /**
    {
@@ -343,7 +329,7 @@ public void testImplicitBatchOfOne() throws Exception {
       }
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+        return ParserBoltTest.createUpdater(Optional.of(1));
       }
     };
 
@@ -459,7 +445,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+        return ParserBoltTest.createUpdater(Optional.of(1));
       }
     };
 
@@ -474,13 +460,14 @@ public void testImplicitBatchOfOne() throws Exception {
   }
 
   @Test
-  public void testBatchOfOne() throws Exception {
+  public void testDefaultBatchSize() throws Exception {
 
     String sensorType = "yaf";
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        // this uses default batch size
         return ParserBoltTest.createUpdater();
       }
     };
@@ -494,12 +481,93 @@ public void testImplicitBatchOfOne() throws Exception {
     when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
     BulkWriterResponse response = new BulkWriterResponse();
+    Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE];
+    for (int i=0; i < uniqueTuples.length; i++) {
+      uniqueTuples[i] = mock(Tuple.class);
+      response.addSuccess(uniqueTuples[i]);
+    }
+    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))),
any())).thenReturn(response);
+    parserBolt.withMessageFilter(filter);
+    for (Tuple tuple : uniqueTuples) {
+      parserBolt.execute(tuple);
+    }
+    for (Tuple uniqueTuple : uniqueTuples) {
+      verify(outputCollector, times(1)).ack(uniqueTuple);
+    }
+  }
+
+  @Test
+  public void testLessRecordsThanDefaultBatchSize() throws Exception {
+
+    String sensorType = "yaf";
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        // this uses default batch size
+        return ParserBoltTest.createUpdater();
+      }
+    };
+
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setZKCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
+    int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1;
+    BulkWriterResponse response = new BulkWriterResponse();
+    Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize];
+    for (int i=0; i < uniqueTuples.length; i++) {
+      uniqueTuples[i] = mock(Tuple.class);
+      response.addSuccess(uniqueTuples[i]);
+    }
+    parserBolt.withMessageFilter(filter);
+    for (Tuple tuple : uniqueTuples) {
+      parserBolt.execute(tuple);
+    }
+    // should have no acking yet - batch size not fulfilled
+    verify(outputCollector, never()).ack(any(Tuple.class));
+    response.addSuccess(t1); // used to achieve count in final verify
+    Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{
+      add(t1);
+    }};
+    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
+    // meet batch size requirement and now it should ack
+    parserBolt.execute(t1);
+    verify(outputCollector, times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class));
+  }
+
+  @Test
+  public void testBatchOfOne() throws Exception {
+
+    String sensorType = "yaf";
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater(Optional.of(1));
+      }
+    };
+
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setZKCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
+    BulkWriterResponse response = new BulkWriterResponse();
     response.addSuccess(t1);
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)),
any())).thenReturn(response);
     parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     verify(outputCollector, times(1)).ack(t1);
   }
+
   @Test
   public void testBatchOfFive() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index d565147..dfadfdc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -94,6 +94,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    *    "outputTopic": "output",
    *    "errorTopic": "parser_error",
    *    "parserConfig": {
+   *        "batchSize" : 1,
    *        "columns" : {
    *            "action" : 0,
    *            "dummy" : 1
@@ -246,7 +247,10 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    *    "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$EmptyObjectParser",
    *    "sensorTopic":"emptyobjectparser",
    *    "outputTopic": "enrichments",
-   *    "errorTopic": "parser_error"
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "batchSize" : 1
+   *    }
    * }
    */
   @Multiline

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
index 22440ad..195e010 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
@@ -57,7 +57,7 @@ public class BatchTimeoutHelper {
   protected int minBatchTimeoutRequestedSecs; //min of all sensorType configured batchTimeout
requests
   protected int recommendedTickIntervalSecs;  //the answer
 
-  BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts
+  public BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts
           , int batchTimeoutDivisor
           )
   {
@@ -130,7 +130,7 @@ public class BatchTimeoutHelper {
    * @return the max batchTimeout allowed, in seconds
    * Guaranteed positive number.
    */
-  protected int getDefaultBatchTimeout() {
+  public int getDefaultBatchTimeout() {
     if (!initialized) {this.init();}
     return maxBatchTimeoutAllowedSecs;
   }
@@ -160,7 +160,7 @@ public class BatchTimeoutHelper {
    * @return the recommended TickInterval to request, in seconds
    * Guaranteed positive number.
    */
-  protected int getRecommendedTickInterval() {
+  public int getRecommendedTickInterval() {
     if (!initialized) {this.init();}
     // Remember that parameter settings in the CLI override parameter settings set by the
Storm component.
     // We shouldn't have to deal with this in the Metron environment, but just in case,

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 1d8f0c6..4bb3888 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,10 +17,15 @@
  */
 package org.apache.metron.writer.bolt;
 
+import static org.apache.storm.utils.TupleUtils.isTick;
+
 import com.google.common.collect.ImmutableList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredIndexingBolt;
-import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.apache.metron.common.bolt.ConfiguredBolt;
+import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
@@ -40,13 +45,7 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
-
-import static org.apache.storm.utils.TupleUtils.isTick;
-
-public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
+public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends ConfiguredBolt<CONFIG_T>
{
 
   private static final Logger LOG = LoggerFactory
           .getLogger(BulkMessageWriterBolt.class);
@@ -61,26 +60,26 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
   private int defaultBatchTimeout;
   private int batchTimeoutDivisor = 1;
 
-  public BulkMessageWriterBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
+  public BulkMessageWriterBolt(String zookeeperUrl, String configurationStrategy) {
+    super(zookeeperUrl, configurationStrategy);
   }
 
-  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject >
bulkMessageWriter) {
+  public BulkMessageWriterBolt<CONFIG_T> withBulkMessageWriter(BulkMessageWriter<JSONObject>
bulkMessageWriter) {
     this.bulkMessageWriter = bulkMessageWriter;
     return this;
   }
 
-  public BulkMessageWriterBolt withMessageWriter(MessageWriter<JSONObject> messageWriter)
{
+  public BulkMessageWriterBolt<CONFIG_T> withMessageWriter(MessageWriter<JSONObject>
messageWriter) {
     this.bulkMessageWriter = new WriterToBulkWriter<>(messageWriter);
     return this;
   }
 
-  public BulkMessageWriterBolt withMessageGetter(String messageGetStrategyType) {
+  public BulkMessageWriterBolt<CONFIG_T> withMessageGetter(String messageGetStrategyType)
{
     this.messageGetStrategyType = messageGetStrategyType;
     return this;
   }
 
-  public BulkMessageWriterBolt withMessageGetterField(String messageGetField) {
+  public BulkMessageWriterBolt<CONFIG_T> withMessageGetterField(String messageGetField)
{
     this.messageGetField = messageGetField;
     return this;
   }
@@ -103,7 +102,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
    * @param batchTimeoutDivisor
    * @return BulkMessageWriterBolt
    */
-  public BulkMessageWriterBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) {
+  public BulkMessageWriterBolt<CONFIG_T> withBatchTimeoutDivisor(int batchTimeoutDivisor)
{
     if (batchTimeoutDivisor <= 0) {
       throw new IllegalArgumentException(String.format("batchTimeoutDivisor must be positive.
Value provided was %s", batchTimeoutDivisor));
     }
@@ -133,6 +132,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
   public void setWriterComponent(BulkWriterComponent<JSONObject> component) {
     writerComponent = component;
   }
+
   /**
    * This method is called by TopologyBuilder.createTopology() to obtain topology and
    * bolt specific configuration parameters.  We use it primarily to configure how often
@@ -151,8 +151,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     else {
       configurationXform = x -> x;
     }
-    WriterConfiguration writerconf = configurationXform.apply(
-            new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+    WriterConfiguration writerconf = configurationXform
+        .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations()));
 
     BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts,
batchTimeoutDivisor);
     this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
@@ -187,8 +187,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
       configurationTransformation = x -> x;
     }
     try {
-      WriterConfiguration writerconf = configurationTransformation.apply(
-              new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+      WriterConfiguration writerconf = configurationTransformation
+          .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations()));
       if (defaultBatchTimeout == 0) {
         //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
         //probably because we are in a unit test scenario.  So calculate it here.
@@ -219,8 +219,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
           //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
           LOG.debug("Flushing message queues older than their batchTimeouts");
           getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
-                  new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
-                  , messageGetStrategy);
+              getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())),
+              messageGetStrategy);
         }
       }
       catch(Exception e) {
@@ -247,8 +247,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
       }
 
       LOG.trace("Writing enrichment message: {}", message);
-      WriterConfiguration writerConfiguration = configurationTransformation.apply(
-              new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+      WriterConfiguration writerConfiguration = configurationTransformation
+          .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations()));
 
       if (writerConfiguration.isDefault(sensorType)) {
         //want to warn, but not fail the tuple

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index c4e3998..efb2418 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -17,25 +17,32 @@
  */
 package org.apache.metron.writer.kafka;
 
-import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Joiner;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.common.utils.StringUtils;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.writer.AbstractWriter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>,
Serializable {
+public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>,
Serializable {
   public enum Configurations {
      BROKER("kafka.brokerUrl")
     ,KEY_SERIALIZER("kafka.keySerializer")
@@ -60,6 +67,15 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
       return null;
     }
   }
+
+  /**
+   * Default batch size in bytes. Note, we don't want to expose this to clients. End users
should
+   * set the writer batchSize setting via the BulkWriterComponent.
+   *
+   * @see ProducerConfig#BATCH_SIZE_DOC
+   * @see <a href="https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/kafka-producer-settings.html">https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/kafka-producer-settings.html</a>
+   */
+  private static final int DEFAULT_BATCH_SIZE = 1_024 * 64; // 64 kilobytes
   private String brokerUrl;
   private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
   private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
@@ -156,33 +172,69 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     }
   }
 
+  @Override
+  public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config)
+      throws Exception {
+    if(this.zkQuorum != null && this.brokerUrl == null) {
+      try {
+        this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+      } catch (Exception e) {
+        throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you
didn't specify them, giving up!", e);
+      }
+    }
+    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
   public Map<String, Object> createProducerConfigs() {
     Map<String, Object> producerConfig = new HashMap<>();
     producerConfig.put("bootstrap.servers", brokerUrl);
     producerConfig.put("key.serializer", keySerializer);
     producerConfig.put("value.serializer", valueSerializer);
     producerConfig.put("request.required.acks", requiredAcks);
+    producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE);
     producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
     producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
     return producerConfig;
   }
 
   @Override
-  public void init() {
-    if(this.zkQuorum != null && this.brokerUrl == null) {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
+      Iterable<Tuple> tuples, List<JSONObject> messages) {
+    BulkWriterResponse writerResponse = new BulkWriterResponse();
+    List<Map.Entry<Tuple, Future>> results = new ArrayList<>();
+    int i = 0;
+    for (Tuple tuple : tuples) {
+      JSONObject message = messages.get(i++);
+      String jsonMessage;
       try {
-        this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
-      } catch (Exception e) {
-        throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you
didn't specify them, giving up!", e);
+         jsonMessage = message.toJSONString();
+      } catch (Throwable t) {
+        writerResponse.addError(t, tuple);
+        continue;
       }
+      Future future = kafkaProducer
+          .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage));
+      // we want to manage the batching
+      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
     }
-    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
-  }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject
message) throws Exception {
-    kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
+    try {
+      // ensures all Future.isDone() == true
+      kafkaProducer.flush();
+    } catch (InterruptException e) {
+      writerResponse.addAllErrors(e, tuples);
+      return writerResponse;
+    }
+
+    for (Map.Entry<Tuple, Future> kv : results) {
+      try {
+        kv.getValue().get();
+        writerResponse.addSuccess(kv.getKey());
+      } catch (Exception e) {
+        writerResponse.addError(e, kv.getKey());
+      }
+    }
+    return writerResponse;
   }
 
   @Override


Mime
View raw message