metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [1/5] incubator-metron git commit: METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192
Date Wed, 20 Jul 2016 13:14:24 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 1c568bf04 -> 5ffcef8d4


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
new file mode 100644
index 0000000..0225137
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.json.simple.JSONObject;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class SourceHandler {
+  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+  List<RotationAction> rotationActions = new ArrayList<>();
+  FileRotationPolicy rotationPolicy;
+  SyncPolicy syncPolicy;
+  FileNameFormat fileNameFormat;
+  private long offset = 0;
+  private int rotation = 0;
+  private transient FSDataOutputStream out;
+  private transient Object writeLock;
+  protected transient Timer rotationTimer; // only used for TimedRotationPolicy
+  protected transient FileSystem fs;
+  protected transient Path currentFile;
+  public SourceHandler(List<RotationAction> rotationActions
+                      , FileRotationPolicy rotationPolicy
+                      , SyncPolicy syncPolicy
+                      , FileNameFormat fileNameFormat
+                      , Map config
+                      ) throws IOException {
+    this.rotationActions = rotationActions;
+    this.rotationPolicy = rotationPolicy;
+    this.syncPolicy = syncPolicy;
+    this.fileNameFormat = fileNameFormat;
+    initialize(config);
+  }
+
+  public void handle(List<JSONObject> messages) throws Exception{
+
+    for(JSONObject message : messages) {
+      byte[] bytes = (message.toJSONString() + "\n").getBytes();
+      synchronized (this.writeLock) {
+        out.write(bytes);
+        this.offset += bytes.length;
+
+        if (this.syncPolicy.mark(null, this.offset)) {
+          if (this.out instanceof HdfsDataOutputStream) {
+            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+          } else {
+            this.out.hsync();
+          }
+          this.syncPolicy.reset();
+        }
+      }
+
+      if (this.rotationPolicy.mark(null, this.offset)) {
+        rotateOutputFile(); // synchronized
+        this.offset = 0;
+        this.rotationPolicy.reset();
+      }
+    }
+  }
+
+  private void initialize(Map config) throws IOException {
+    this.writeLock = new Object();
+    Configuration hdfsConfig = new Configuration();
+    this.fs = FileSystem.get(new Configuration());
+    HdfsSecurityUtil.login(config, hdfsConfig);
+    this.currentFile = createOutputFile();
+    if(this.rotationPolicy instanceof TimedRotationPolicy){
+      long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+      this.rotationTimer = new Timer(true);
+      TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+          try {
+            rotateOutputFile();
+          } catch(IOException e){
+            LOG.warn("IOException during scheduled file rotation.", e);
+          }
+        }
+      };
+      this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+    }
+  }
+
+  protected void rotateOutputFile() throws IOException {
+    LOG.info("Rotating output file...");
+    long start = System.currentTimeMillis();
+    synchronized (this.writeLock) {
+      closeOutputFile();
+      this.rotation++;
+
+      Path newFile = createOutputFile();
+      LOG.info("Performing " +  this.rotationActions.size() + " file rotation actions." );
+      for (RotationAction action : this.rotationActions) {
+        action.execute(this.fs, this.currentFile);
+      }
+      this.currentFile = newFile;
+    }
+    long time = System.currentTimeMillis() - start;
+    LOG.info("File rotation took " + time + " ms.");
+  }
+
+  private Path createOutputFile() throws IOException {
+    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation,
System.currentTimeMillis()));
+    if(fs.getScheme().equals("file")) {
+      //in the situation where we're running this in a local filesystem, flushing doesn't
work.
+      fs.mkdirs(path.getParent());
+      this.out = new FSDataOutputStream(new FileOutputStream(path.toString()), null);
+    }
+    else {
+      this.out = this.fs.create(path);
+    }
+    return path;
+  }
+
+  private void closeOutputFile() throws IOException {
+    this.out.close();
+  }
+
+
+  public void close() {
+    try {
+      closeOutputFile();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to close output file.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
new file mode 100644
index 0000000..f6acf6f
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.kafka;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Joiner;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.common.utils.StringUtils;
+import org.apache.metron.writer.AbstractWriter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>,
Serializable {
+  public enum Configurations {
+     BROKER("kafka.brokerUrl")
+    ,KEY_SERIALIZER("kafka.keySerializer")
+    ,ZK_QUORUM("kafka.zkQuorum")
+    ,VALUE_SERIALIZER("kafka.valueSerializer")
+    ,REQUIRED_ACKS("kafka.requiredAcks")
+    ,TOPIC("kafka.topic")
+    ,PRODUCER_CONFIGS("kafka.producerConfigs");
+    ;
+    String key;
+    Configurations(String key) {
+      this.key = key;
+    }
+    public Object get(Optional<String> configPrefix, Map<String, Object> config)
{
+      return config.get(StringUtils.join(".", configPrefix, Optional.of(key)));
+    }
+    public <T> T getAndConvert(Optional<String> configPrefix, Map<String,
Object> config, Class<T> clazz) {
+      Object o = get(configPrefix, config);
+      if(o != null) {
+        return ConversionUtils.convert(o, clazz);
+      }
+      return null;
+    }
+  }
+  private String brokerUrl;
+  private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
+  private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
+  private int requiredAcks = 1;
+  private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
+  private KafkaProducer kafkaProducer;
+  private String configPrefix = null;
+  private String zkQuorum = null;
+  private Map<String, Object> producerConfigs = new HashMap<>();
+
+  public KafkaWriter() {}
+
+  public KafkaWriter(String brokerUrl) {
+    this.brokerUrl = brokerUrl;
+  }
+
+  public KafkaWriter withZkQuorum(String zkQuorum) {
+    this.zkQuorum = zkQuorum;
+    return this;
+  }
+  public KafkaWriter withKeySerializer(String keySerializer) {
+    this.keySerializer = keySerializer;
+    return this;
+  }
+
+  public KafkaWriter withValueSerializer(String valueSerializer) {
+    this.valueSerializer = valueSerializer;
+    return this;
+  }
+
+  public KafkaWriter withRequiredAcks(Integer requiredAcks) {
+    this.requiredAcks = requiredAcks;
+    return this;
+  }
+
+  public KafkaWriter withTopic(String topic) {
+    this.kafkaTopic= topic;
+    return this;
+  }
+  public KafkaWriter withConfigPrefix(String prefix) {
+    this.configPrefix = prefix;
+    return this;
+  }
+
+  public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
+    this.producerConfigs = extraConfigs;
+    return this;
+  }
+
+  public Optional<String> getConfigPrefix() {
+    return Optional.ofNullable(configPrefix);
+  }
+
+  @Override
+  public void configure(String sensorName, WriterConfiguration configuration) {
+    Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
+    String brokerUrl = Configurations.BROKER.getAndConvert(getConfigPrefix(), configMap,
String.class);
+    if(brokerUrl != null) {
+      this.brokerUrl = brokerUrl;
+    }
+    String zkQuorum = Configurations.ZK_QUORUM.getAndConvert(getConfigPrefix(), configMap,
String.class);
+    if(zkQuorum != null) {
+      withZkQuorum(zkQuorum);
+    }
+    String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(getConfigPrefix(),
configMap, String.class);
+    if(keySerializer != null) {
+      withKeySerializer(keySerializer);
+    }
+    String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(getConfigPrefix(),
configMap, String.class);
+    if(valueSerializer != null) {
+      withValueSerializer(keySerializer);
+    }
+    Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(getConfigPrefix(),
configMap, Integer.class);
+    if(requiredAcks!= null) {
+      withRequiredAcks(requiredAcks);
+    }
+    String topic = Configurations.TOPIC.getAndConvert(getConfigPrefix(), configMap, String.class);
+    if(topic != null) {
+      withTopic(topic);
+    }
+    Map<String, Object> producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(),
configMap);
+    if(producerConfigs != null) {
+      withProducerConfigs(producerConfigs);
+    }
+  }
+
+  public Map<String, Object> createProducerConfigs() {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", brokerUrl);
+    producerConfig.put("key.serializer", keySerializer);
+    producerConfig.put("value.serializer", valueSerializer);
+    producerConfig.put("request.required.acks", requiredAcks);
+    producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
+    return producerConfig;
+  }
+
+  @Override
+  public void init() {
+    if(this.zkQuorum != null && this.brokerUrl == null) {
+      try {
+        this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+      } catch (Exception e) {
+        throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you
didn't specify them, giving up!", e);
+      }
+    }
+    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject
message) throws Exception {
+    kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
+  }
+
+  @Override
+  public void close() throws Exception {
+    kafkaProducer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
new file mode 100644
index 0000000..49bab6f
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.message;
+
+import backtype.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+public interface MessageGetter {
+  JSONObject getMessage(Tuple t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
new file mode 100644
index 0000000..f25ea62
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.message;
+
+
+import backtype.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+public enum MessageGetters implements MessageGetter{
+   RAW(RawMessageGetter.DEFAULT)
+  ,NAMED(NamedMessageGetter.DEFAULT)
+  ;
+  MessageGetter getter;
+  MessageGetters(MessageGetter getter) {
+    this.getter = getter;
+  }
+  @Override
+  public JSONObject getMessage(Tuple t) {
+    return getter.getMessage(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
new file mode 100644
index 0000000..70cfab2
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.message;
+
+import backtype.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+public class NamedMessageGetter implements MessageGetter {
+  public static NamedMessageGetter DEFAULT = new NamedMessageGetter("message");
+  private String messageName;
+  public NamedMessageGetter(String name) {
+    this.messageName = name;
+  }
+  @Override
+  public JSONObject getMessage(Tuple tuple) {
+    return (JSONObject)tuple.getValueByField(messageName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
new file mode 100644
index 0000000..16c0173
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.message;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import java.io.UnsupportedEncodingException;
+
+public class RawMessageGetter implements MessageGetter {
+  public static RawMessageGetter DEFAULT = new RawMessageGetter(0);
+  private ThreadLocal<JSONParser> parser = new ThreadLocal<JSONParser>() {
+    @Override
+    protected JSONParser initialValue() {
+      return new JSONParser();
+    }
+  };
+  int position = 0;
+  public RawMessageGetter(int position) {
+    this.position = position;
+  }
+  @Override
+  public JSONObject getMessage(Tuple t) {
+    byte[] data = t.getBinary(position);
+    try {
+      return (JSONObject) parser.get().parse(new String(data, "UTF8"));
+    } catch (Exception e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/NoopWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/NoopWriterTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/NoopWriterTest.java
new file mode 100644
index 0000000..3a5a762
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/NoopWriterTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NoopWriterTest {
+  @Test
+  public void testFixedLatencyConfig() {
+    NoopWriter writer = new NoopWriter().withLatency("10");
+    Assert.assertTrue(writer.sleepFunction instanceof NoopWriter.FixedLatency);
+    NoopWriter.FixedLatency sleepFunction = (NoopWriter.FixedLatency)writer.sleepFunction;
+    Assert.assertEquals(10, sleepFunction.getLatency());
+  }
+
+  private void ensureRandomLatencyConfig(String latencyConfig, int min, int max) {
+    NoopWriter writer = new NoopWriter().withLatency(latencyConfig);
+    Assert.assertTrue(writer.sleepFunction instanceof NoopWriter.RandomLatency);
+    NoopWriter.RandomLatency sleepFunction = (NoopWriter.RandomLatency)writer.sleepFunction;
+    Assert.assertEquals(min, sleepFunction.getMin());
+    Assert.assertEquals(max, sleepFunction.getMax());
+  }
+
+  @Test
+  public void testRandomLatencyConfig() {
+    ensureRandomLatencyConfig("10,20", 10, 20);
+    ensureRandomLatencyConfig("10, 20", 10, 20);
+    ensureRandomLatencyConfig("10 ,20", 10, 20);
+    ensureRandomLatencyConfig("10 , 20", 10, 20);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
new file mode 100644
index 0000000..1b95430
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaWriterTest {
+
+
+  public static final String SENSOR_TYPE = "test";
+  public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig)
{
+    ParserConfigurations configurations = new ParserConfigurations();
+    configurations.updateSensorParserConfig( SENSOR_TYPE
+                                           , new SensorParserConfig() {{
+                                              setParserConfig(parserConfig);
+                                                                      }}
+                                           );
+    return new ParserWriterConfiguration(configurations);
+  }
+
+  @Test
+  public void testHappyPathGlobalConfig() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topic" , SENSOR_TYPE);
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Map<String, Object> producerConfigs = writer.createProducerConfigs();
+    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
+    Assert.assertEquals(producerConfigs.get("key1"), 1);
+    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+  }
+
+  @Test
+  public void testHappyPathGlobalConfigWithPrefix() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    writer.withConfigPrefix("prefix");
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("prefix.kafka.brokerUrl" , "localhost:6667");
+              put("prefix.kafka.topic" , SENSOR_TYPE);
+              put("prefix.kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Map<String, Object> producerConfigs = writer.createProducerConfigs();
+    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
+    Assert.assertEquals(producerConfigs.get("key1"), 1);
+    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index ec274f0..1f8d5f1 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -51,6 +51,8 @@
 		<module>metron-integration-test</module>
 		<module>metron-test-utilities</module>
 		<module>metron-api</module>
+		<module>metron-indexing</module>
+		<module>metron-writer</module>
 		<module>metron-hbase</module>
 		<module>elasticsearch-shaded</module>
 		<module>metron-elasticsearch</module>


Mime
View raw message