metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [4/5] incubator-metron git commit: METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192
Date Wed, 20 Jul 2016 13:14:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
index 226b686..0f391c4 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
@@ -21,7 +21,7 @@ config:
 components:
 # Enrichment
     -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.integration.mock.MockGeoAdapter"
+        className: "org.apache.metron.enrichment.integration.mock.MockGeoAdapter"
     -   id: "geoEnrichment"
         className: "org.apache.metron.enrichment.configuration.Enrichment"
         constructorArgs:
@@ -111,27 +111,17 @@ components:
                 args:
                     - ref: "simpleHBaseThreatIntelEnrichment"
 
-    -   id: "fileNameFormat"
-        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
-        configMethods:
-            -   name: "withPrefix"
-                args:
-                    - "enrichment-"
-            -   name: "withExtension"
-                args:
-                  - ".json"
-            -   name: "withPath"
-                args:
-                    - "${index.hdfs.output}"
+
 #indexing
-    -   id: "hdfsWriter"
-        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         configMethods:
-            -   name: "withFileNameFormat"
+            -   name: "withTopic"
                 args:
-                    - ref: "fileNameFormat"
-    -   id: "indexWriter"
-        className: "${writer.class.name}"
+                    - "${enrichment.output.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
 
 #kafka/zookeeper
     -   id: "zkHosts"
@@ -249,22 +239,14 @@ bolts:
             -   name: "withMaxTimeRetain"
                 args: [10]
 # Indexing Bolts
-    -   id: "indexingBolt"
-        className: "org.apache.metron.enrichment.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "indexWriter"
-    -   id: "hdfsIndexingBolt"
-        className: "org.apache.metron.enrichment.bolt.BulkMessageWriterBolt"
+    -   id: "outputBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withBulkMessageWriter"
+            -   name: "withMessageWriter"
                 args:
-                    - ref: "hdfsWriter"
+                    - ref: "kafkaWriter"
 
 
 streams:
@@ -361,24 +343,12 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
-#indexing
-    -   name: "threatIntelJoin -> indexing"
+
+#output
+    -   name: "threatIntelJoin -> output"
         from: "threatIntelJoinBolt"
-        to: "indexingBolt"
+        to: "outputBolt"
         grouping:
             streamId: "message"
             type: FIELDS
             args: ["key"]
-    -   name: "threatIntelJoin -> hdfs"
-        from: "threatIntelJoinBolt"
-        to: "hdfsIndexingBolt"
-        grouping:
-            streamId: "message"
-            type: SHUFFLE
-
-    -   name: "indexingBolt -> errorIndexingBolt"
-        from: "indexingBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
deleted file mode 100644
index 772dfc6..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
-import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterComponent;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
-
-  private static final Logger LOG = LoggerFactory
-          .getLogger(BulkMessageWriterBolt.class);
-  private BulkMessageWriter<JSONObject> bulkMessageWriter;
-  private BulkWriterComponent<JSONObject> writerComponent;
-  public BulkMessageWriterBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) {
-    this.bulkMessageWriter = bulkMessageWriter;
-    return this;
-  }
-
-  @Override
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.writerComponent = new BulkWriterComponent<>(collector);
-    super.prepare(stormConf, context, collector);
-    try {
-      bulkMessageWriter.init(stormConf, new EnrichmentWriterConfiguration(getConfigurations()));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void execute(Tuple tuple) {
-    JSONObject message =(JSONObject)tuple.getValueByField("message");
-    String sensorType = MessageUtils.getSensorType(message);
-    try
-    {
-      writerComponent.write(sensorType, tuple, message, bulkMessageWriter, new EnrichmentWriterConfiguration(getConfigurations()));
-    }
-    catch(Exception e) {
-      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA", e);
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
index 3c38824..3b7e437 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
@@ -17,13 +17,12 @@
  */
 package org.apache.metron.enrichment.lookup.accesstracker;
 
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnel;
-import com.google.common.hash.PrimitiveSink;
+import org.apache.metron.common.utils.BloomFilter;
 import org.apache.metron.enrichment.lookup.LookupKey;
 
 import java.io.*;
 import java.util.Map;
+import java.util.function.Function;
 
 public class BloomAccessTracker implements AccessTracker {
     private static final long serialVersionUID = 1L;
@@ -31,25 +30,14 @@ public class BloomAccessTracker implements AccessTracker {
     public static final String FALSE_POSITIVE_RATE_KEY = "false_positive_rate";
     public static final String NAME_KEY = "name";
 
-    private static class LookupKeyFunnel implements Funnel<LookupKey> {
-        @Override
-        public void funnel(LookupKey lookupKey, PrimitiveSink primitiveSink) {
-            primitiveSink.putBytes(lookupKey.toBytes());
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return this.getClass().equals(obj.getClass());
-        }
 
+    public static class LookupKeySerializer implements Function<LookupKey, byte[]>, Serializable {
         @Override
-        public int hashCode() {
-            return super.hashCode() * 31;
+        public byte[] apply(LookupKey lookupKey) {
+            return lookupKey.toBytes();
         }
     }
 
-    private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
-
     BloomFilter<LookupKey> filter;
     String name;
     int expectedInsertions;
@@ -60,7 +48,7 @@ public class BloomAccessTracker implements AccessTracker {
         this.name = name;
         this.expectedInsertions = expectedInsertions;
         this.falsePositiveRate = falsePositiveRate;
-        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+        filter = new BloomFilter<LookupKey>(new LookupKeySerializer(), expectedInsertions, falsePositiveRate);
     }
     public BloomAccessTracker() {}
     public BloomAccessTracker(Map<String, Object> config) {
@@ -73,7 +61,7 @@ public class BloomAccessTracker implements AccessTracker {
     @Override
     public void logAccess(LookupKey key) {
         numInsertions++;
-        filter.put(key);
+        filter.add(key);
     }
 
     @Override
@@ -81,7 +69,7 @@ public class BloomAccessTracker implements AccessTracker {
         expectedInsertions = toInt(config.get(EXPECTED_INSERTIONS_KEY));
         falsePositiveRate = toDouble(config.get(FALSE_POSITIVE_RATE_KEY));
         name = config.get(NAME_KEY).toString();
-        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+        filter = new BloomFilter<LookupKey>(new LookupKeySerializer(), expectedInsertions, falsePositiveRate);
     }
 
     @Override
@@ -91,7 +79,7 @@ public class BloomAccessTracker implements AccessTracker {
 
     @Override
     public void reset() {
-        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+        filter = new BloomFilter<LookupKey>(new LookupKeySerializer(), expectedInsertions, falsePositiveRate);
     }
 
     private static double toDouble(Object o) {
@@ -129,7 +117,7 @@ public class BloomAccessTracker implements AccessTracker {
             throw new IllegalStateException("Unable to union access tracker, because this tracker is not initialized.");
         }
         if(tracker instanceof BloomAccessTracker ) {
-            filter.putAll(((BloomAccessTracker)tracker).getFilter());
+            filter.merge(((BloomAccessTracker)tracker).getFilter());
             return this;
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
new file mode 100644
index 0000000..cfdae33
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.writer.AbstractWriter;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>, Serializable {
+  public enum Configurations {
+    HBASE_TABLE("shew.table")
+    ,HBASE_CF("shew.cf")
+    ,KEY_COLUMNS("shew.keyColumns")
+    ,KEY_DELIM("shew.keyDelim")
+    ,ENRICHMENT_TYPE("shew.enrichmentType")
+    ,VALUE_COLUMNS("shew.valueColumns")
+    ,HBASE_PROVIDER("shew.hbaseProvider")
+    ;
+    String key;
+    Configurations(String key) {
+      this.key = key;
+    }
+    public String getKey() {
+      return key;
+    }
+    public Object get(Map<String, Object> config) {
+      return config.get(key);
+    }
+    public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
+      Object o = get(config);
+      if(o != null) {
+        return ConversionUtils.convert(o, clazz);
+      }
+      return null;
+    }
+  }
+  public static class KeyTransformer {
+    List<String> keys = new ArrayList<>();
+    Set<String> keySet;
+    private String delim = ":";
+    public KeyTransformer(String key) {
+      this(key, null);
+    }
+    public KeyTransformer(String key, String delim) {
+      keys.add(key);
+      keySet = new HashSet<>(this.keys);
+      this.delim = delim == null?this.delim:delim;
+    }
+    public KeyTransformer(Iterable<String> keys) {
+      this(keys, null);
+    }
+    public KeyTransformer(Iterable<String> keys, String delim) {
+      Iterables.addAll(this.keys, keys);
+      keySet = new HashSet<>(this.keys);
+      this.delim = delim == null?this.delim:delim;
+    }
+
+    public String transform(final JSONObject message) {
+      return
+      keys.stream().map( x -> {
+        Object o = message.get(x);
+        return o == null?"":o.toString();
+      }).collect(Collectors.joining(delim));
+    }
+  }
+  private transient EnrichmentConverter converter;
+  private String tableName;
+  private String cf;
+  private HTableInterface table;
+  private TableProvider provider;
+  private Map.Entry<Object, KeyTransformer> keyTransformer;
+
+  public SimpleHbaseEnrichmentWriter() {
+  }
+
+  @Override
+  public void configure(String sensorName, WriterConfiguration configuration) {
+    String hbaseProviderImpl = Configurations.HBASE_PROVIDER.getAndConvert(configuration.getSensorConfig(sensorName),String.class);
+    if(hbaseProviderImpl != null) {
+      provider = ReflectionUtils.createInstance(hbaseProviderImpl);
+    }
+    if(converter == null) {
+      converter = new EnrichmentConverter();
+    }
+  }
+
+  @Override
+  public void init(Map stormConf, WriterConfiguration configuration) throws Exception {
+    if(converter == null) {
+      converter = new EnrichmentConverter();
+    }
+  }
+
+  protected synchronized TableProvider getProvider() {
+    if(provider == null) {
+
+      provider = new HTableProvider();
+    }
+    return provider;
+  }
+
+  public HTableInterface getTable(String tableName, String cf) throws IOException {
+    synchronized(this) {
+      boolean isInitial = this.tableName == null || this.cf == null;
+      boolean isValid = tableName != null && cf != null;
+
+      if( isInitial || (isValid && (!this.tableName.equals(tableName) || !this.cf.equals(cf)) )
+        )
+      {
+        Configuration conf = HBaseConfiguration.create();
+        //new table connection
+        if(table != null) {
+          table.close();
+        }
+        table = getProvider().getTable(conf, tableName);
+        this.tableName = tableName;
+        this.cf = cf;
+      }
+      return table;
+    }
+  }
+
+  public HTableInterface getTable(Map<String, Object> config) throws IOException {
+    return getTable(Configurations.HBASE_TABLE.getAndConvert(config, String.class)
+                   ,Configurations.HBASE_CF.getAndConvert(config, String.class)
+                   );
+
+  }
+
+
+  private List<String> getColumns(Object keyColumnsObj, boolean allowNull) {
+    Object o = keyColumnsObj;
+    if(allowNull && keyColumnsObj == null) {
+      return Collections.emptyList();
+    }
+    if(o instanceof String) {
+      return ImmutableList.of(o.toString());
+    }
+    else if (o instanceof List) {
+      List<String> keyCols = new ArrayList<>();
+      for(Object key : (List)o) {
+        keyCols.add(key.toString());
+      }
+      return keyCols;
+    }
+    else {
+      throw new RuntimeException("Unable to get columns: " + o);
+    }
+  }
+
+  private KeyTransformer getTransformer(Map<String, Object> config) {
+    Object o = Configurations.KEY_COLUMNS.get(config);
+    KeyTransformer transformer = null;
+    if(keyTransformer != null && keyTransformer.getKey() == o) {
+      return keyTransformer.getValue();
+    }
+    else {
+      List<String> keys = getColumns(o, false);
+      Object delimObj = Configurations.KEY_DELIM.get(config);
+      String delim = (delimObj == null || !(delimObj instanceof String))?null:delimObj.toString();
+      transformer = new KeyTransformer(keys, delim);
+      keyTransformer = new AbstractMap.SimpleEntry<>(o, transformer);
+      return transformer;
+    }
+  }
+
+
+  private EnrichmentValue getValue( JSONObject message
+                                  , Set<String> keyColumns
+                                  , Set<String> valueColumns
+                                  )
+  {
+    Map<String, Object> metadata = new HashMap<>();
+    if(valueColumns == null || valueColumns.isEmpty()) {
+      for (Object kv : message.entrySet()) {
+        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
+        if (!keyColumns.contains(entry.getKey())) {
+          metadata.put(entry.getKey().toString(), entry.getValue());
+        }
+      }
+      return new EnrichmentValue(metadata);
+    }
+    else {
+      for (Object kv : message.entrySet()) {
+        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
+        if (valueColumns.contains(entry.getKey())) {
+          metadata.put(entry.getKey().toString(), entry.getValue());
+        }
+      }
+      return new EnrichmentValue(metadata);
+    }
+  }
+
+  private EnrichmentKey getKey(JSONObject message, KeyTransformer transformer, String enrichmentType) {
+    if(enrichmentType != null) {
+      return new EnrichmentKey(enrichmentType, transformer.transform(message));
+    }
+    else {
+      return null;
+    }
+  }
+
+  @Override
+  public void write( String sensorType
+                    , WriterConfiguration configurations
+                    , Iterable<Tuple> tuples
+                    , List<JSONObject> messages
+                    ) throws Exception
+  {
+    Map<String, Object> sensorConfig = configurations.getSensorConfig(sensorType);
+    HTableInterface table = getTable(sensorConfig);
+    KeyTransformer transformer = getTransformer(sensorConfig);
+    Object enrichmentTypeObj = Configurations.ENRICHMENT_TYPE.get(sensorConfig);
+    String enrichmentType = enrichmentTypeObj == null?null:enrichmentTypeObj.toString();
+    Set<String> valueColumns = new HashSet<>(getColumns(Configurations.VALUE_COLUMNS.get(sensorConfig), true));
+    List<Put> puts = new ArrayList<>();
+    for(JSONObject message : messages) {
+      EnrichmentKey key = getKey(message, transformer, enrichmentType);
+      EnrichmentValue value = getValue(message, transformer.keySet, valueColumns);
+      if(key == null || value == null) {
+        continue;
+      }
+      Put put = converter.toPut(this.cf, key, value);
+      if(put != null) {
+        puts.add(put);
+      }
+    }
+    table.put(puts);
+  }
+
+  @Override
+  public void close() throws Exception {
+    synchronized(this) {
+      if(table != null) {
+        table.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
deleted file mode 100644
index f7e7aeb..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.hbase;
-
-import backtype.storm.tuple.Tuple;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
-import org.apache.metron.common.writer.AbstractWriter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.json.simple.JSONObject;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>, Serializable {
-  public enum Configurations {
-    HBASE_TABLE("shew.table")
-    ,HBASE_CF("shew.cf")
-    ,KEY_COLUMNS("shew.keyColumns")
-    ,KEY_DELIM("shew.keyDelim")
-    ,ENRICHMENT_TYPE("shew.enrichmentType")
-    ,VALUE_COLUMNS("shew.valueColumns")
-    ,HBASE_PROVIDER("shew.hbaseProvider")
-    ;
-    String key;
-    Configurations(String key) {
-      this.key = key;
-    }
-    public String getKey() {
-      return key;
-    }
-    public Object get(Map<String, Object> config) {
-      return config.get(key);
-    }
-    public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
-      Object o = get(config);
-      if(o != null) {
-        return ConversionUtils.convert(o, clazz);
-      }
-      return null;
-    }
-  }
-  public static class KeyTransformer {
-    List<String> keys = new ArrayList<>();
-    Set<String> keySet;
-    private String delim = ":";
-    public KeyTransformer(String key) {
-      this(key, null);
-    }
-    public KeyTransformer(String key, String delim) {
-      keys.add(key);
-      keySet = new HashSet<>(this.keys);
-      this.delim = delim == null?this.delim:delim;
-    }
-    public KeyTransformer(Iterable<String> keys) {
-      this(keys, null);
-    }
-    public KeyTransformer(Iterable<String> keys, String delim) {
-      Iterables.addAll(this.keys, keys);
-      keySet = new HashSet<>(this.keys);
-      this.delim = delim == null?this.delim:delim;
-    }
-
-    public String transform(final JSONObject message) {
-      return
-      keys.stream().map( x -> {
-        Object o = message.get(x);
-        return o == null?"":o.toString();
-      }).collect(Collectors.joining(delim));
-    }
-  }
-  private transient EnrichmentConverter converter;
-  private String tableName;
-  private String cf;
-  private HTableInterface table;
-  private TableProvider provider;
-  private Map.Entry<Object, KeyTransformer> keyTransformer;
-
-  public SimpleHbaseEnrichmentWriter() {
-  }
-
-  @Override
-  public void configure(String sensorName, WriterConfiguration configuration) {
-    String hbaseProviderImpl = Configurations.HBASE_PROVIDER.getAndConvert(configuration.getSensorConfig(sensorName),String.class);
-    if(hbaseProviderImpl != null) {
-      provider = ReflectionUtils.createInstance(hbaseProviderImpl);
-    }
-    if(converter == null) {
-      converter = new EnrichmentConverter();
-    }
-  }
-
-  @Override
-  public void init(Map stormConf, WriterConfiguration configuration) throws Exception {
-    if(converter == null) {
-      converter = new EnrichmentConverter();
-    }
-  }
-
-  protected synchronized TableProvider getProvider() {
-    if(provider == null) {
-
-      provider = new HTableProvider();
-    }
-    return provider;
-  }
-
-  public HTableInterface getTable(String tableName, String cf) throws IOException {
-    synchronized(this) {
-      boolean isInitial = this.tableName == null || this.cf == null;
-      boolean isValid = tableName != null && cf != null;
-
-      if( isInitial || (isValid && (!this.tableName.equals(tableName) || !this.cf.equals(cf)) )
-        )
-      {
-        Configuration conf = HBaseConfiguration.create();
-        //new table connection
-        if(table != null) {
-          table.close();
-        }
-        table = getProvider().getTable(conf, tableName);
-        this.tableName = tableName;
-        this.cf = cf;
-      }
-      return table;
-    }
-  }
-
-  public HTableInterface getTable(Map<String, Object> config) throws IOException {
-    return getTable(Configurations.HBASE_TABLE.getAndConvert(config, String.class)
-                   ,Configurations.HBASE_CF.getAndConvert(config, String.class)
-                   );
-
-  }
-
-
-  private List<String> getColumns(Object keyColumnsObj, boolean allowNull) {
-    Object o = keyColumnsObj;
-    if(allowNull && keyColumnsObj == null) {
-      return Collections.emptyList();
-    }
-    if(o instanceof String) {
-      return ImmutableList.of(o.toString());
-    }
-    else if (o instanceof List) {
-      List<String> keyCols = new ArrayList<>();
-      for(Object key : (List)o) {
-        keyCols.add(key.toString());
-      }
-      return keyCols;
-    }
-    else {
-      throw new RuntimeException("Unable to get columns: " + o);
-    }
-  }
-
-  private KeyTransformer getTransformer(Map<String, Object> config) {
-    Object o = Configurations.KEY_COLUMNS.get(config);
-    KeyTransformer transformer = null;
-    if(keyTransformer != null && keyTransformer.getKey() == o) {
-      return keyTransformer.getValue();
-    }
-    else {
-      List<String> keys = getColumns(o, false);
-      Object delimObj = Configurations.KEY_DELIM.get(config);
-      String delim = (delimObj == null || !(delimObj instanceof String))?null:delimObj.toString();
-      transformer = new KeyTransformer(keys, delim);
-      keyTransformer = new AbstractMap.SimpleEntry<>(o, transformer);
-      return transformer;
-    }
-  }
-
-
-  private EnrichmentValue getValue( JSONObject message
-                                  , Set<String> keyColumns
-                                  , Set<String> valueColumns
-                                  )
-  {
-    Map<String, Object> metadata = new HashMap<>();
-    if(valueColumns == null || valueColumns.isEmpty()) {
-      for (Object kv : message.entrySet()) {
-        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
-        if (!keyColumns.contains(entry.getKey())) {
-          metadata.put(entry.getKey().toString(), entry.getValue());
-        }
-      }
-      return new EnrichmentValue(metadata);
-    }
-    else {
-      for (Object kv : message.entrySet()) {
-        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
-        if (valueColumns.contains(entry.getKey())) {
-          metadata.put(entry.getKey().toString(), entry.getValue());
-        }
-      }
-      return new EnrichmentValue(metadata);
-    }
-  }
-
-  private EnrichmentKey getKey(JSONObject message, KeyTransformer transformer, String enrichmentType) {
-    if(enrichmentType != null) {
-      return new EnrichmentKey(enrichmentType, transformer.transform(message));
-    }
-    else {
-      return null;
-    }
-  }
-
-  @Override
-  public void write( String sensorType
-                    , WriterConfiguration configurations
-                    , Iterable<Tuple> tuples
-                    , List<JSONObject> messages
-                    ) throws Exception
-  {
-    Map<String, Object> sensorConfig = configurations.getSensorConfig(sensorType);
-    HTableInterface table = getTable(sensorConfig);
-    KeyTransformer transformer = getTransformer(sensorConfig);
-    Object enrichmentTypeObj = Configurations.ENRICHMENT_TYPE.get(sensorConfig);
-    String enrichmentType = enrichmentTypeObj == null?null:enrichmentTypeObj.toString();
-    Set<String> valueColumns = new HashSet<>(getColumns(Configurations.VALUE_COLUMNS.get(sensorConfig), true));
-    List<Put> puts = new ArrayList<>();
-    for(JSONObject message : messages) {
-      EnrichmentKey key = getKey(message, transformer, enrichmentType);
-      EnrichmentValue value = getValue(message, transformer.keySet, valueColumns);
-      if(key == null || value == null) {
-        continue;
-      }
-      Put put = converter.toPut(this.cf, key, value);
-      if(put != null) {
-        puts.add(put);
-      }
-    }
-    table.put(puts);
-  }
-
-  @Override
-  public void close() throws Exception {
-    synchronized(this) {
-      if(table != null) {
-        table.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
deleted file mode 100644
index 01f1245..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer.hdfs;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.json.simple.JSONObject;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
-  List<RotationAction> rotationActions = new ArrayList<>();
-  FileRotationPolicy rotationPolicy = new NoRotationPolicy();
-  SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
-  FileNameFormat fileNameFormat;
-  Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
-  transient Map stormConfig;
-  public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
-    this.fileNameFormat = fileNameFormat;
-    return this;
-  }
-
-  public HdfsWriter withSyncPolicy(SyncPolicy syncPolicy){
-    this.syncPolicy = syncPolicy;
-    return this;
-  }
-  public HdfsWriter withRotationPolicy(FileRotationPolicy rotationPolicy){
-    this.rotationPolicy = rotationPolicy;
-    return this;
-  }
-
-  public HdfsWriter addRotationAction(RotationAction action){
-    this.rotationActions.add(action);
-    return this;
-  }
-
-  @Override
-  public void init(Map stormConfig, WriterConfiguration configurations) {
-    this.stormConfig = stormConfig;
-  }
-
-
-  @Override
-  public void write(String sourceType
-                   , WriterConfiguration configurations
-                   , Iterable<Tuple> tuples
-                   , List<JSONObject> messages
-                   ) throws Exception
-  {
-    SourceHandler handler = getSourceHandler(sourceType);
-    handler.handle(messages);
-  }
-
-  @Override
-  public void close() {
-    for(SourceHandler handler : sourceHandlerMap.values()) {
-      handler.close();
-    }
-  }
-  private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException {
-    SourceHandler ret = sourceHandlerMap.get(sourceType);
-    if(ret == null) {
-      ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
-      sourceHandlerMap.put(sourceType, ret);
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
deleted file mode 100644
index 1c345b4..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.storm.hdfs.common.rotation.RotationAction;
-
-import java.io.IOException;
-
-public class SourceAwareMoveAction implements RotationAction{
-  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
-  private String destination;
-
-  public SourceAwareMoveAction toDestination(String destDir){
-    destination = destDir;
-    return this;
-  }
-
-  private static String getSource(Path filePath) {
-    return filePath.getParent().getName();
-  }
-
-  @Override
-  public void execute(FileSystem fileSystem, Path filePath) throws IOException {
-    Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName());
-    LOG.info("Moving file " + filePath + " to " + destPath);
-    boolean success = fileSystem.rename(filePath, destPath);
-    return;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
deleted file mode 100644
index ae0242d..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.hdfs;
-
-import backtype.storm.task.TopologyContext;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-
-import java.util.Map;
-
-public class SourceFileNameFormat implements FileNameFormat {
-  FileNameFormat delegate;
-  String sourceType;
-  public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
-    this.delegate = delegate;
-    this.sourceType = sourceType;
-  }
-
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext) {
-    this.delegate.prepare(map, topologyContext);
-  }
-
-  @Override
-  public String getName(long l, long l1) {
-    return delegate.getName(l, l1);
-  }
-
-  @Override
-  public String getPath() {
-    return delegate.getPath() + "/" + sourceType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
deleted file mode 100644
index 0225137..0000000
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.log4j.Logger;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
-import org.json.simple.JSONObject;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-public class SourceHandler {
-  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
-  List<RotationAction> rotationActions = new ArrayList<>();
-  FileRotationPolicy rotationPolicy;
-  SyncPolicy syncPolicy;
-  FileNameFormat fileNameFormat;
-  private long offset = 0;
-  private int rotation = 0;
-  private transient FSDataOutputStream out;
-  private transient Object writeLock;
-  protected transient Timer rotationTimer; // only used for TimedRotationPolicy
-  protected transient FileSystem fs;
-  protected transient Path currentFile;
-  public SourceHandler(List<RotationAction> rotationActions
-                      , FileRotationPolicy rotationPolicy
-                      , SyncPolicy syncPolicy
-                      , FileNameFormat fileNameFormat
-                      , Map config
-                      ) throws IOException {
-    this.rotationActions = rotationActions;
-    this.rotationPolicy = rotationPolicy;
-    this.syncPolicy = syncPolicy;
-    this.fileNameFormat = fileNameFormat;
-    initialize(config);
-  }
-
-  public void handle(List<JSONObject> messages) throws Exception{
-
-    for(JSONObject message : messages) {
-      byte[] bytes = (message.toJSONString() + "\n").getBytes();
-      synchronized (this.writeLock) {
-        out.write(bytes);
-        this.offset += bytes.length;
-
-        if (this.syncPolicy.mark(null, this.offset)) {
-          if (this.out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-          } else {
-            this.out.hsync();
-          }
-          this.syncPolicy.reset();
-        }
-      }
-
-      if (this.rotationPolicy.mark(null, this.offset)) {
-        rotateOutputFile(); // synchronized
-        this.offset = 0;
-        this.rotationPolicy.reset();
-      }
-    }
-  }
-
-  private void initialize(Map config) throws IOException {
-    this.writeLock = new Object();
-    Configuration hdfsConfig = new Configuration();
-    this.fs = FileSystem.get(new Configuration());
-    HdfsSecurityUtil.login(config, hdfsConfig);
-    this.currentFile = createOutputFile();
-    if(this.rotationPolicy instanceof TimedRotationPolicy){
-      long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
-      this.rotationTimer = new Timer(true);
-      TimerTask task = new TimerTask() {
-        @Override
-        public void run() {
-          try {
-            rotateOutputFile();
-          } catch(IOException e){
-            LOG.warn("IOException during scheduled file rotation.", e);
-          }
-        }
-      };
-      this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
-    }
-  }
-
-  protected void rotateOutputFile() throws IOException {
-    LOG.info("Rotating output file...");
-    long start = System.currentTimeMillis();
-    synchronized (this.writeLock) {
-      closeOutputFile();
-      this.rotation++;
-
-      Path newFile = createOutputFile();
-      LOG.info("Performing " +  this.rotationActions.size() + " file rotation actions." );
-      for (RotationAction action : this.rotationActions) {
-        action.execute(this.fs, this.currentFile);
-      }
-      this.currentFile = newFile;
-    }
-    long time = System.currentTimeMillis() - start;
-    LOG.info("File rotation took " + time + " ms.");
-  }
-
-  private Path createOutputFile() throws IOException {
-    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
-    if(fs.getScheme().equals("file")) {
-      //in the situation where we're running this in a local filesystem, flushing doesn't work.
-      fs.mkdirs(path.getParent());
-      this.out = new FSDataOutputStream(new FileOutputStream(path.toString()), null);
-    }
-    else {
-      this.out = this.fs.create(path);
-    }
-    return path;
-  }
-
-  private void closeOutputFile() throws IOException {
-    this.out.close();
-  }
-
-
-  public void close() {
-    try {
-      closeOutputFile();
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to close output file.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
new file mode 100755
index 0000000..6824b87
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=${project.version}
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/enrichment.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 04415e1..c42c284 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -21,11 +21,10 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.writer.bolt.BulkMessageWriterBolt;
 import org.hamcrest.Description;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
new file mode 100644
index 0000000..6b12264
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.*;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
+import org.apache.metron.integration.*;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.enrichment.integration.mock.MockGeoAdapter;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import org.apache.metron.enrichment.integration.utils.SampleUtil;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class EnrichmentIntegrationTest extends BaseIntegrationTest {
+
+  private static final String SRC_IP = "ip_src_addr";
+  private static final String DST_IP = "ip_dst_addr";
+  private static final String MALICIOUS_IP_TYPE = "malicious_ip";
+  private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
+  private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
+    put("orientation", "north");
+  }};
+  protected String testSensorType = "test";
+  protected String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
+  protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
+  protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
+  private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed";
+
+  public static class Provider implements TableProvider, Serializable {
+    MockHTable.Provider  provider = new MockHTable.Provider();
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      return provider.getTable(config, tableName);
+    }
+  }
+
+
+
+
+
+
+  @Test
+  public void test() throws Exception {
+    final EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs();
+    final String dateFormat = "yyyy.MM.dd.HH";
+    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+    final String cf = "cf";
+    final String trackerHBaseTableName = "tracker";
+    final String threatIntelTableName = "threat_intel";
+    final String enrichmentsTableName = "enrichments";
+    final Properties topologyProperties = new Properties() {{
+      setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+      setProperty("hbase.provider.impl","" + Provider.class.getName());
+      setProperty("threat.intel.tracker.table", trackerHBaseTableName);
+      setProperty("threat.intel.tracker.cf", cf);
+      setProperty("threat.intel.simple.hbase.table", threatIntelTableName);
+      setProperty("threat.intel.simple.hbase.cf", cf);
+      setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
+      setProperty("enrichment.simple.hbase.cf", cf);
+      setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC);
+    }};
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+      add(new KafkaWithZKComponent.Topic(Constants.INDEXING_TOPIC, 1));
+    }});
+
+    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
+            .withTopologyProperties(topologyProperties)
+            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
+
+    //create MockHBaseTables
+    final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
+    final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf);
+    EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+      add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>())));
+    }});
+    final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf);
+    EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+      add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3")
+                        , new EnrichmentValue(PLAYFUL_ENRICHMENT )
+                        )
+         );
+    }});
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(fluxPath))
+            .withTopologyName("test")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", fluxComponent)
+            .withMillisecondsBetweenAttempts(15000)
+            .withNumRetries(10)
+            .build();
+    runner.start();
+
+    try {
+      fluxComponent.submitTopology();
+
+      kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+      List<Map<String, Object>> docs = runner.process(getProcessor(inputMessages));
+      Assert.assertEquals(inputMessages.size(), docs.size());
+      List<Map<String, Object>> cleanedDocs = docs;
+      validateAll(cleanedDocs);
+    }
+    finally {
+      runner.stop();
+    }
+  }
+
+
+  public static void validateAll(List<Map<String, Object>> docs) {
+
+    for (Map<String, Object> doc : docs) {
+      baseValidation(doc);
+      hostEnrichmentValidation(doc);
+      geoEnrichmentValidation(doc);
+      threatIntelValidation(doc);
+      simpleEnrichmentValidation(doc);
+    }
+  }
+
+  public static void baseValidation(Map<String, Object> jsonDoc) {
+    assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet());
+    assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet());
+    for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
+      //ensure no values are empty.
+      Assert.assertTrue(kv.getValue().toString().length() > 0);
+    }
+    //ensure we always have a source ip and destination ip
+    Assert.assertNotNull(jsonDoc.get(SRC_IP));
+    Assert.assertNotNull(jsonDoc.get(DST_IP));
+  }
+
+  private static class EvaluationPayload {
+    Map<String, Object> indexedDoc;
+    String key;
+    public EvaluationPayload(Map<String, Object> indexedDoc, String key) {
+      this.indexedDoc = indexedDoc;
+      this.key = key;
+    }
+  }
+
+  private static enum HostEnrichments implements Predicate<EvaluationPayload>{
+
+    LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.local","").equals("YES");
+
+      }
+    })
+
+    ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.local","").equals("UNKNOWN");
+      }
+    })
+    ,IMPORTANT(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.asset_value","").equals("important");
+      }
+    })
+    ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("printer");
+      }
+    })
+    ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("webserver");
+      }
+    })
+    ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("unknown");
+      }
+    })
+    ;
+
+    Predicate<EvaluationPayload> _predicate;
+    HostEnrichments(Predicate<EvaluationPayload> predicate) {
+      this._predicate = predicate;
+    }
+
+    public boolean apply(EvaluationPayload payload) {
+      return _predicate.apply(payload);
+    }
+
+  }
+
+  private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
+    for(String key : keys) {
+      if(key.startsWith(topLevel)) {
+        String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
+        String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
+                       + Joiner.on(",").join(expectedEnrichments) + "), but it was not there.  If you've created a new"
+                       + " enrichment, then please add a validation method to this unit test.  Otherwise, it's a solid error"
+                       + " and should be investigated.";
+        Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
+      }
+    }
+  }
+  private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc) {
+    if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")
+            || indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")
+            ) {
+      Assert.assertTrue(keyPatternExists("enrichments.hbaseEnrichment", indexedDoc));
+      if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation")
+        );
+      }
+      else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) {
+        Assert.assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation")
+        );
+      }
+    }
+
+  }
+  private static void threatIntelValidation(Map<String, Object> indexedDoc) {
+    if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")
+    || indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")
+            ) {
+      //if we have any threat intel messages, we want to tag is_alert to true
+      Assert.assertTrue(keyPatternExists("threatintels.", indexedDoc));
+      Assert.assertTrue(indexedDoc.containsKey("threat.triage.level"));
+      Assert.assertEquals(indexedDoc.getOrDefault("is_alert",""), "true");
+      Assert.assertEquals((double)indexedDoc.get("threat.triage.level"), 10d, 1e-7);
+    }
+    else {
+      //For YAF this is the case, but if we do snort later on, this will be invalid.
+      Assert.assertNull(indexedDoc.get("is_alert"));
+      Assert.assertFalse(keyPatternExists("threatintels.", indexedDoc));
+    }
+    //ip threat intels
+    if(keyPatternExists("threatintels.hbaseThreatIntel.", indexedDoc)) {
+      if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE), "alert");
+      }
+      else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE), "alert");
+      }
+      else {
+        Assert.fail("There was a threat intels that I did not expect: " + indexedDoc);
+      }
+    }
+
+  }
+
+  private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
+    //should have geo enrichment on every message due to mock geo adapter
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+  }
+
+  private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
+    boolean enriched = false;
+    //important local printers
+    {
+      Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get(DST_IP))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+        );
+        enriched = true;
+      }
+    }
+    //important local webservers
+    {
+      Set<String> ips = setOf("10.1.128.236");
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get(DST_IP))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+        );
+        enriched = true;
+      }
+    }
+    if(!enriched) {
+      Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
+    }
+  }
+
+
+  private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
+    for(String k : indexedObj.keySet()) {
+      if(k.startsWith(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  private static Set<String> setOf(String... items) {
+    Set<String> ret = new HashSet<>();
+    for(String item : items) {
+      ret.add(item);
+    }
+    return ret;
+  }
+
+  public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+
+      public ReadinessState process(ComponentRunner runner) {
+        KafkaWithZKComponent kafkaComponent = runner.getComponent("kafka", KafkaWithZKComponent.class);
+        List<byte[]> messages = kafkaComponent.readMessages(Constants.INDEXING_TOPIC);
+        if (messages.size() == inputMessages.size()) {
+          docs = new ArrayList<>();
+          for(byte[] message : messages) {
+            try {
+              docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {}));
+            } catch (IOException e) {
+              throw new IllegalStateException(e.getMessage(), e);
+            }
+          }
+          return ReadinessState.READY;
+        } else {
+          return ReadinessState.NOT_READY;
+        }
+      }
+
+      public List<Map<String, Object>> getResult() {
+        return docs;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
new file mode 100644
index 0000000..5cc14ac
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration.components;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class ConfigUploadComponent implements InMemoryComponent {
+
+  private Properties topologyProperties;
+  private String globalConfigPath;
+  private String parserConfigsPath;
+  private String enrichmentConfigsPath;
+  private Optional<String> globalConfig = Optional.empty();
+  private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
+  public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
+    this.topologyProperties = topologyProperties;
+    return this;
+  }
+
+  public ConfigUploadComponent withGlobalConfigsPath(String globalConfigPath) {
+    this.globalConfigPath = globalConfigPath;
+    return this;
+  }
+
+  public ConfigUploadComponent withParserConfigsPath(String parserConfigsPath) {
+    this.parserConfigsPath = parserConfigsPath;
+    return this;
+  }
+  public ConfigUploadComponent withEnrichmentConfigsPath(String enrichmentConfigsPath) {
+    this.enrichmentConfigsPath = enrichmentConfigsPath;
+    return this;
+  }
+
+  public ConfigUploadComponent withParserSensorConfig(String name, SensorParserConfig config) {
+    parserSensorConfigs.put(name, config);
+    return this;
+  }
+
+  public ConfigUploadComponent withGlobalConfig(String globalConfig) {
+    this.globalConfig = Optional.ofNullable(globalConfig);
+    return this;
+  }
+
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      if(globalConfigPath != null) {
+        ConfigurationsUtils.uploadConfigsToZookeeper(globalConfigPath
+                , parserConfigsPath
+                , enrichmentConfigsPath
+                , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+        );
+      }
+      for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
+        ConfigurationsUtils.writeSensorParserConfigToZookeeper( kv.getKey()
+                                                              , kv.getValue()
+                                                              , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+                                                              );
+      }
+      if(globalConfig.isPresent()) {
+        ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.get().getBytes()
+                                                        , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+                                                        );
+      }
+
+    } catch (Exception e) {
+      throw new UnableToStartException(e.getMessage(), e);
+    }
+  }
+
+  public SensorParserConfig getSensorParserConfig(String sensorType) {
+    SensorParserConfig sensorParserConfig = new SensorParserConfig();
+    CuratorFramework client = ConfigurationsUtils.getClient(topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY));
+    client.start();
+    try {
+      sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(sensorType, client);
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      client.close();
+    }
+    return sensorParserConfig;
+  }
+
+  @Override
+  public void stop() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
new file mode 100644
index 0000000..14a4493
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration.mock;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class MockGeoAdapter implements EnrichmentAdapter<CacheKey>,
+        Serializable {
+
+  public static final String DEFAULT_LOC_ID = "1";
+  public static final String DEFAULT_COUNTRY = "test country";
+  public static final String DEFAULT_CITY = "test city";
+  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
+  public static final String DEFAULT_LATITUDE = "test latitude";
+  public static final String DEFAULT_LONGITUDE = "test longitude";
+  public static final String DEFAULT_DMACODE= "test dmaCode";
+  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
+
+  @Override
+  public void logAccess(CacheKey value) {
+
+  }
+
+  public JSONObject enrich(CacheKey cache ) {
+    JSONObject enriched = new JSONObject();
+    enriched.put("locID", DEFAULT_LOC_ID);
+    enriched.put("country", DEFAULT_COUNTRY);
+    enriched.put("city", DEFAULT_CITY);
+    enriched.put("postalCode", DEFAULT_POSTAL_CODE);
+    enriched.put("latitude", DEFAULT_LATITUDE);
+    enriched.put("longitude", DEFAULT_LONGITUDE);
+    enriched.put("dmaCode", DEFAULT_DMACODE);
+    enriched.put("location_point", DEFAULT_LOCATION_POINT);
+    return enriched;
+  }
+
+  public boolean initializeAdapter() {
+    return true;
+  }
+
+  public void cleanup() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockHBaseConnector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockHBaseConnector.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockHBaseConnector.java
new file mode 100644
index 0000000..3b16b6a
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockHBaseConnector.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration.mock;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.metron.hbase.Connector;
+import org.apache.metron.hbase.TupleTableConfig;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MockHBaseConnector extends Connector {
+    static List<Put> puts = Collections.synchronizedList(new ArrayList<Put>());
+    public MockHBaseConnector(TupleTableConfig conf, String _quorum, String _port) throws IOException {
+        super(conf, _quorum, _port);
+    }
+
+    @Override
+    public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+        puts.add(put);
+    }
+
+    @Override
+    public void close() {
+
+    }
+    public static void clear() {
+        puts.clear();
+    }
+    public static List<Put> getPuts() {
+        return puts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
new file mode 100644
index 0000000..ac2904a
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.integration.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.test.mock.MockHTable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class MockTableProvider implements TableProvider, Serializable {
+  static MockHTable.Provider provider = new MockHTable.Provider();
+  @Override
+  public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+    return provider.getTable(config, tableName);
+  }
+  public static void addTable(String tableName, String... cf) {
+    provider.addToCache(tableName, cf);
+  }
+  public static MockHTable getTable(String tableName) {
+    try {
+      return (MockHTable) provider.getTable(null, tableName);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get table: " + tableName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
new file mode 100644
index 0000000..40dcfe4
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration.utils;
+
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class SampleUtil {
+
+  public static Configurations getSampleConfigs() throws IOException {
+    Configurations configurations = new Configurations();
+    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    return configurations;
+  }
+
+  public static ParserConfigurations getSampleParserConfigs() throws IOException {
+    ParserConfigurations configurations = new ParserConfigurations();
+    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorParserConfigs = ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+    for(String sensorType: sensorParserConfigs.keySet()) {
+      configurations.updateSensorParserConfig(sensorType, sensorParserConfigs.get(sensorType));
+    }
+    return configurations;
+  }
+
+  public static EnrichmentConfigurations getSampleEnrichmentConfigs() throws IOException {
+    EnrichmentConfigurations configurations = new EnrichmentConfigurations();
+    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+      configurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+    }
+    return configurations;
+  }
+
+}



Mime
View raw message