metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [27/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88
Date Tue, 26 Apr 2016 14:46:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/HBaseBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/HBaseBolt.java
new file mode 100644
index 0000000..1eff028
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/HBaseBolt.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+import org.apache.metron.hbase.Connector;
+import org.apache.metron.hbase.HTableConnector;
+import org.apache.metron.hbase.TupleTableConfig;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import org.apache.metron.common.utils.ErrorUtils;
+
+/**
+ * A Storm bolt for putting data into HBase.
+ * <p>
+ * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
+ * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ * @see TupleTableConfig
+ * @see HTableConnector
+ */
+@SuppressWarnings("serial")
+public class HBaseBolt implements IRichBolt {
+  private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
+  private static final String DEFAULT_ZK_PORT = "2181";
+
+  protected OutputCollector collector;
+  protected TupleTableConfig conf;
+  protected boolean autoAck = true;
+  protected Connector connector;
+  private String _quorum;
+  private String _port;
+
+  public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
+    this.conf = conf;
+    _quorum = quorum;
+    _port = port;
+  }
+  public HBaseBolt(final TupleTableConfig conf, String zkConnectString) throws IOException {
+    this(conf, zkConnectStringToHosts(zkConnectString), zkConnectStringToPort(zkConnectString));
+  }
+  public static String zkConnectStringToHosts(String connString) {
+    Iterable<String> hostPortPairs = Splitter.on(',').split(connString);
+    return Joiner.on(',').join(Iterables.transform(hostPortPairs, new Function<String, String>() {
+
+      @Override
+      public String apply(String hostPortPair) {
+        return Iterables.getFirst(Splitter.on(':').split(hostPortPair), "");
+      }
+    }));
+  }
+  public static String zkConnectStringToPort(String connString) {
+    String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), "");
+    return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT);
+  }
+
+
+  public Connector createConnector() throws IOException{
+    initialize();
+    return new HTableConnector(conf, _quorum, _port);
+  }
+
+  public void initialize() {
+    TupleTableConfig hbaseBoltConfig = conf;
+    String allColumnFamiliesColumnQualifiers = conf.getFields();
+    String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils
+            .split(allColumnFamiliesColumnQualifiers, "\\|");
+    for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) {
+      String[] cfCqTokens = StringUtils.split( tokenizedColumnFamilyWithColumnQualifiers, ":");
+      String columnFamily = cfCqTokens[0];
+      String[] columnQualifiers = StringUtils.split(cfCqTokens[1], ",");
+      for (String columnQualifier : columnQualifiers) {
+        hbaseBoltConfig.addColumn(columnFamily, columnQualifier);
+      }
+      setAutoAck(true);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("rawtypes")
+  
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+
+    try {
+      if(connector == null) {
+        this.connector = createConnector();
+      }
+		
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
+  }
+
+  /** {@inheritDoc} */
+  
+  public void execute(Tuple input) {
+    try {
+      Put p = conf.getPutFromTuple(input);
+      this.connector.put(p);
+    } catch (IOException ex) {
+
+  		JSONObject error = ErrorUtils.generateErrorMessage(
+  				"Alerts problem: " + input.toString(), ex);
+  		collector.emit("error", new Values(error));
+  		
+      throw new RuntimeException(ex);
+    }
+
+    if (this.autoAck) {
+      this.collector.ack(input);
+    }
+  }
+
+  /** {@inheritDoc} */
+  
+  public void cleanup() {
+    this.connector.close();
+  }
+
+  /** {@inheritDoc} */
+  
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+	  declarer.declareStream("error", new Fields("HBase"));
+  }
+
+  /** {@inheritDoc} */
+  
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  /**
+   * @return the autoAck
+   */
+  public boolean isAutoAck() {
+    return autoAck;
+  }
+
+  /**
+   * @param autoAck the autoAck to set
+   */
+  public void setAutoAck(boolean autoAck) {
+    this.autoAck = autoAck;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
new file mode 100644
index 0000000..68e56ed
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.base.Joiner;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
+import org.apache.metron.common.bolt.ConfiguredBolt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class JoinBolt<V> extends ConfiguredBolt {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(JoinBolt.class);
+  protected OutputCollector collector;
+
+  protected transient CacheLoader<String, Map<String, V>> loader;
+  protected transient LoadingCache<String, Map<String, V>> cache;
+  protected Long maxCacheSize;
+  protected Long maxTimeRetain;
+
+  public JoinBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  public JoinBolt withMaxCacheSize(long maxCacheSize) {
+    this.maxCacheSize = maxCacheSize;
+    return this;
+  }
+
+  public JoinBolt withMaxTimeRetain(long maxTimeRetain) {
+    this.maxTimeRetain = maxTimeRetain;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+    super.prepare(map, topologyContext, outputCollector);
+    this.collector = outputCollector;
+    if (this.maxCacheSize == null)
+      throw new IllegalStateException("maxCacheSize must be specified");
+    if (this.maxTimeRetain == null)
+      throw new IllegalStateException("maxTimeRetain must be specified");
+    loader = new CacheLoader<String, Map<String, V>>() {
+      public Map<String, V> load(String key) throws Exception {
+        return new HashMap<>();
+      }
+    };
+    cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+            .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+            .build(loader);
+    prepare(map, topologyContext);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    String streamId = tuple.getSourceStreamId();
+    String key = (String) tuple.getValueByField("key");
+    V message = (V) tuple.getValueByField("message");
+    try {
+      Map<String, V> streamMessageMap = cache.get(key);
+      if (streamMessageMap.containsKey(streamId)) {
+        LOG.warn(String.format("Received key %s twice for " +
+                "stream %s", key, streamId));
+      }
+      streamMessageMap.put(streamId, message);
+      Set<String> streamIds = getStreamIds(message);
+      Set<String> streamMessageKeys = streamMessageMap.keySet();
+      if (streamMessageKeys.size() == streamIds.size() && Sets.symmetricDifference
+              (streamMessageKeys, streamIds)
+              .isEmpty()) {
+        collector.emit("message", tuple, new Values(key, joinMessages
+                (streamMessageMap)));
+        collector.ack(tuple);
+        cache.invalidate(key);
+      } else {
+        cache.put(key, streamMessageMap);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(getClass().getSimpleName() + ": Missed joining portions for "+ key + ". Expected " + Joiner.on(",").join(streamIds)
+                  + " != " + Joiner.on(",").join(streamMessageKeys)
+                   );
+        }
+      }
+    } catch (ExecutionException e) {
+      collector.reportError(e);
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("message", new Fields("key", "message"));
+  }
+
+  public abstract void prepare(Map map, TopologyContext topologyContext);
+
+  public abstract Set<String> getStreamIds(V value);
+
+  public abstract V joinMessages(Map<String, V> streamMessageMap);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
new file mode 100644
index 0000000..4ff387c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.common.bolt.ConfiguredBolt;
+
+import java.util.Map;
+import java.util.Set;
+
+public abstract class SplitBolt<T> extends
+        ConfiguredBolt {
+
+  protected OutputCollector collector;
+
+  public SplitBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  @Override
+  public final void prepare(Map map, TopologyContext topologyContext,
+                       OutputCollector outputCollector) {
+    super.prepare(map, topologyContext, outputCollector);
+    collector = outputCollector;
+    prepare(map, topologyContext);
+  }
+
+  @Override
+  public final void execute(Tuple tuple) {
+    emit(tuple, generateMessage(tuple));
+  }
+
+  @Override
+  public final void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("message", new Fields("key", "message"));
+    for (String streamId : getStreamIds()) {
+      declarer.declareStream(streamId, new Fields("key", "message"));
+    }
+    declarer.declareStream("error", new Fields("message"));
+    declareOther(declarer);
+  }
+
+  public void emit(Tuple tuple, T message) {
+    if (message == null) return;
+    String key = getKey(tuple, message);
+    collector.emit("message", tuple, new Values(key, message));
+    Map<String, T> streamMessageMap = splitMessage(message);
+    for (String streamId : streamMessageMap.keySet()) {
+      T streamMessage = streamMessageMap.get(streamId);
+      if (streamMessage == null) {
+        streamMessage = getDefaultMessage(streamId);
+      }
+      collector.emit(streamId, new Values(key, streamMessage));
+    }
+    collector.ack(tuple);
+    emitOther(tuple, message);
+  }
+
+  protected T getDefaultMessage(String streamId) {
+    throw new IllegalArgumentException("Could not find a message for" +
+            " stream: " + streamId);
+  }
+
+  public abstract void prepare(Map map, TopologyContext topologyContext);
+
+  public abstract Set<String> getStreamIds();
+
+  public abstract String getKey(Tuple tuple, T message);
+
+  public abstract T generateMessage(Tuple tuple);
+
+  public abstract Map<String, T> splitMessage(T message);
+
+  public abstract void declareOther(OutputFieldsDeclarer declarer);
+
+  public abstract void emitOther(Tuple tuple, T message);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
new file mode 100644
index 0000000..a2b0e78
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.configuration.SensorEnrichmentConfig;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
+
+  protected static final Logger LOG = LoggerFactory
+          .getLogger(ThreatIntelJoinBolt.class);
+
+  public ThreatIntelJoinBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  @Override
+  public Map<String, List<String>> getFieldMap(String sourceType) {
+    SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+    if(config != null) {
+      return config.getThreatIntelFieldMap();
+    }
+    else {
+      LOG.error("Unable to retrieve sensor config: " + sourceType);
+      return null;
+    }
+  }
+
+  @Override
+  public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
+    JSONObject ret = super.joinMessages(streamMessageMap);
+    for(Object key : ret.keySet()) {
+      if(key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
+        ret.put("is_alert" , "true");
+        break;
+      }
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
new file mode 100644
index 0000000..692c327
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
+
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
+
+  public ThreatIntelSplitterBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  @Override
+  protected Map<String, List<String>> getFieldMap(String sensorType) {
+    return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
+  }
+
+  @Override
+  protected String getKeyName(String type, String field) {
+    return ThreatIntelUtils.getThreatIntelKey(type, field);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/cli/LatencySummarizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/cli/LatencySummarizer.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/cli/LatencySummarizer.java
new file mode 100644
index 0000000..b40f2ad
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/cli/LatencySummarizer.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.cli;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.cli.*;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.*;
+
+public class LatencySummarizer {
+  public static class Pair extends AbstractMap.SimpleEntry<String, String> {
+    public Pair(String key, String value) {
+      super(key, value);
+    }
+  }
+
+  public static class LatencyStats {
+    private NavigableMap<Integer, Map<Pair, DescriptiveStatistics>> depthMap = new TreeMap<>();
+    private List<String> metrics;
+    public void updateMetrics(List<String> metrics) {
+      this.metrics = metrics;
+    }
+    public Map<Pair, DescriptiveStatistics> getStatsMap(int depth) {
+      Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+      if(statsMap == null) {
+        statsMap = new HashMap<>();
+        depthMap.put(depth, statsMap);
+      }
+      return statsMap;
+    }
+    public DescriptiveStatistics getStats( int depth, Pair p) {
+      Map<Pair, DescriptiveStatistics> statsMap = getStatsMap(depth);
+      DescriptiveStatistics stats = statsMap.get(p);
+      if(stats == null) {
+        stats = new DescriptiveStatistics();
+        statsMap.put(p, stats);
+      }
+      return stats;
+    }
+    public void put(int depth, Pair p, double val) {
+      getStats(depth, p).addValue(val);
+    }
+
+    public static void summary(String title, DescriptiveStatistics statistics, PrintStream pw, boolean meanOnly) {
+      if(meanOnly) {
+        pw.println(title + ": "
+                + "\n\tMean: " + statistics.getMean()
+        );
+      }
+      else {
+        pw.println(title + ": "
+                + "\n\tMean: " + statistics.getMean()
+                + "\n\tMin: " + statistics.getMin()
+                + "\n\t1th: " + statistics.getPercentile(1)
+                + "\n\t5th: " + statistics.getPercentile(5)
+                + "\n\t10th: " + statistics.getPercentile(10)
+                + "\n\t25th: " + statistics.getPercentile(25)
+                + "\n\t50th: " + statistics.getPercentile(50)
+                + "\n\t90th: " + statistics.getPercentile(90)
+                + "\n\t95th: " + statistics.getPercentile(95)
+                + "\n\t99th: " + statistics.getPercentile(99)
+                + "\n\tMax: " + statistics.getMax()
+                + "\n\tStdDev: " + statistics.getStandardDeviation()
+        );
+      }
+    }
+    public void printDepthSummary(int depth, boolean meanOnly) {
+      Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+      System.out.println("\nDistance " + depth);
+      System.out.println("----------------\n");
+      List<Map.Entry<Pair, DescriptiveStatistics>> sortedStats = new ArrayList<>();
+      for(Map.Entry<Pair, DescriptiveStatistics> stats : statsMap.entrySet()) {
+        sortedStats.add(stats);
+      }
+      Collections.sort(sortedStats, new Comparator<Map.Entry<Pair, DescriptiveStatistics>>() {
+        @Override
+        public int compare(Map.Entry<Pair, DescriptiveStatistics> o1, Map.Entry<Pair, DescriptiveStatistics> o2) {
+          return -1*Double.compare(o1.getValue().getMean(), o2.getValue().getMean());
+        }
+      });
+      for(Map.Entry<Pair, DescriptiveStatistics> stats : sortedStats) {
+        summary(stats.getKey().getKey() + " -> " + stats.getKey().getValue(), stats.getValue(), System.out, meanOnly);
+      }
+    }
+    public void printSummary(boolean meanOnly) {
+      System.out.println("Flow:");
+      System.out.println("\t" + Joiner.on(" -> ").join(metrics));
+      System.out.println("\nSUMMARY BY DISTANCE\n--------------------------");
+      for(int depth : depthMap.keySet()) {
+        printDepthSummary(depth, meanOnly);
+      }
+    }
+
+  }
+
+  public static String getBaseMetric(String s) {
+    Iterable<String> tokenIt = Splitter.on('.').split(s);
+    int num = Iterables.size(tokenIt);
+    return Joiner.on('.').join(Iterables.limit(tokenIt, num-1));
+  }
+
+  public static void updateStats(LatencyStats stats, Map<String, Object> doc) {
+    Map<String, Long> latencyMap = new HashMap<>();
+    NavigableMap<Long, String> latencyInvMap = new TreeMap<>();
+    for(Map.Entry<String, Object> kv : doc.entrySet()) {
+      if(kv.getKey().endsWith(".ts")) {
+        String base = getBaseMetric(kv.getKey());
+        long latency = Long.parseLong(kv.getValue().toString());
+        latencyInvMap.put(latency, base);
+        latencyMap.put( base, latency);
+      }
+    }
+    List<String> metrics = new ArrayList<>();
+    for(Map.Entry<Long, String> kv : latencyInvMap.entrySet()) {
+      metrics.add(kv.getValue());
+    }
+    stats.updateMetrics(metrics);
+    for(int i = 0;i < metrics.size();++i) {
+      for(int j = i+1;j < metrics.size();++j) {
+        Pair p = new Pair(metrics.get(i), metrics.get(j));
+        long ms = latencyMap.get(metrics.get(j)) - latencyMap.get(metrics.get(i));
+        stats.put(j-i, p, ms);
+      }
+    }
+  }
+
+
+
+  public static void main(String... argv) throws IOException {
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("m", "mean_only", false, "Print the mean only when we summarize");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, argv);
+    }
+    catch(ParseException pe) {
+      pe.printStackTrace();
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+      System.exit(-1);
+    }
+    if( cmd.hasOption("h") ){
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+      System.exit(0);
+    }
+    LatencyStats statsMap = new LatencyStats();
+    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+    for(String line = null;(line = reader.readLine()) != null;) {
+      Map<String, Object> doc = JSONUtils.INSTANCE.load(line, new TypeReference<HashMap<String, Object>>() {});
+      updateStats(statsMap, doc);
+    }
+    statsMap.printSummary(cmd.hasOption('m'));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/configuration/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/configuration/Enrichment.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/configuration/Enrichment.java
new file mode 100644
index 0000000..736a911
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/configuration/Enrichment.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.configuration;
+
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
+
+  private String type;
+  private List<String> fields;
+  private T adapter;
+
+  public Enrichment() {}
+
+  public Enrichment(String type, T adapter) {
+    this.type = type;
+    this.adapter = adapter;
+  }
+
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setFields(List<String> fields) {
+    this.fields = fields;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public T getAdapter() {
+    return adapter;
+  }
+
+  public void setAdapter(T adapter) {
+    this.adapter = adapter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
new file mode 100644
index 0000000..4b57677
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.converter;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+
+
+public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements HbaseConverter<KEY_T,VALUE_T> {
+  public static Function<Cell, Map.Entry<byte[], byte[]>> CELL_TO_ENTRY  = new Function<Cell, Map.Entry<byte[], byte[]>>() {
+
+    @Nullable
+    @Override
+    public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
+      return new AbstractMap.SimpleEntry<>(cell.getQualifier(), cell.getValue());
+    }
+  };
+  @Override
+  public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+    Put put = new Put(key.toBytes());
+    byte[] cf = Bytes.toBytes(columnFamily);
+    for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
+      put.add(cf, kv.getKey(), kv.getValue());
+    }
+    return put;
+  }
+
+  public LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+    key.fromBytes(put.getRow());
+    byte[] cf = Bytes.toBytes(columnFamily);
+    value.fromColumns(Iterables.transform(put.getFamilyCellMap().get(cf), CELL_TO_ENTRY));
+    return new LookupKV<>(key, value);
+  }
+
+  @Override
+  public Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+    Put put = toPut(columnFamily, key, values);
+    return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
+  }
+
+  public LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+    if(result == null || result.getRow() == null) {
+      return null;
+    }
+    key.fromBytes(result.getRow());
+    byte[] cf = Bytes.toBytes(columnFamily);
+    NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
+    value.fromColumns(cols.entrySet());
+    return new LookupKV<>(key, value);
+  }
+  @Override
+  public Get toGet(String columnFamily, KEY_T key) {
+    Get ret = new Get(key.toBytes());
+    ret.addFamily(Bytes.toBytes(columnFamily));
+    return ret;
+  }
+
+  public static Iterable<Map.Entry<byte[], byte[]>> toEntries(byte[]... kvs) {
+    if(kvs.length % 2 != 0)  {
+      throw new IllegalStateException("Must be an even size");
+    }
+    List<Map.Entry<byte[], byte[]>> ret = new ArrayList<>(kvs.length/2);
+    for(int i = 0;i < kvs.length;i += 2) {
+      ret.add(new AbstractMap.SimpleImmutableEntry<>(kvs[i], kvs[i+1])) ;
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentConverter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentConverter.java
new file mode 100644
index 0000000..6f19781
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentConverter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.converter;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+
+public class EnrichmentConverter extends AbstractConverter<EnrichmentKey, EnrichmentValue> {
+
+  @Override
+  public LookupKV<EnrichmentKey, EnrichmentValue> fromPut(Put put, String columnFamily) throws IOException {
+    return fromPut(put, columnFamily, new EnrichmentKey(), new EnrichmentValue());
+  }
+
+  @Override
+  public LookupKV<EnrichmentKey, EnrichmentValue> fromResult(Result result, String columnFamily) throws IOException {
+    return fromResult(result, columnFamily, new EnrichmentKey(), new EnrichmentValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java
new file mode 100644
index 0000000..475ee8c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.converter;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+
+public enum EnrichmentHelper {
+    INSTANCE;
+    EnrichmentConverter converter = new EnrichmentConverter();
+
+    public void load(HTableInterface table, String cf, Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> results) throws IOException {
+        for(LookupKV<EnrichmentKey, EnrichmentValue> result : results) {
+            Put put = converter.toPut(cf, result.getKey(), result.getValue());
+            table.put(put);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
new file mode 100644
index 0000000..6201ad1
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.converter;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.*;
+
+public class EnrichmentKey implements LookupKey {
+  private static final int SEED = 0xDEADBEEF;
+  private static final int HASH_PREFIX_SIZE=16;
+  ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+    @Override
+    protected HashFunction initialValue() {
+      return Hashing.murmur3_128(SEED);
+    }
+  };
+
+  public String indicator;
+  public String type;
+
+  public EnrichmentKey() {
+
+  }
+  public EnrichmentKey(String type, String indicator) {
+    this.indicator = indicator;
+    this.type = type;
+  }
+
+  private byte[] typedIndicatorToBytes() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream w = new DataOutputStream(baos);
+    w.writeUTF(type);
+    w.writeUTF(indicator);
+    w.flush();
+    return baos.toByteArray();
+  }
+
+  @Override
+  public byte[] toBytes() {
+    byte[] indicatorBytes = new byte[0];
+    try {
+      indicatorBytes = typedIndicatorToBytes();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to convert type and indicator to bytes", e);
+    }
+    Hasher hasher = hFunction.get().newHasher();
+    hasher.putBytes(Bytes.toBytes(indicator));
+    byte[] prefix = hasher.hash().asBytes();
+    byte[] val = new byte[indicatorBytes.length + prefix.length];
+    int offset = 0;
+    System.arraycopy(prefix, 0, val, offset, prefix.length);
+    offset += prefix.length;
+    System.arraycopy(indicatorBytes, 0, val, offset, indicatorBytes.length);
+    return val;
+  }
+
+  @Override
+  public void fromBytes(byte[] row) {
+    ByteArrayInputStream baos = new ByteArrayInputStream(row);
+    baos.skip(HASH_PREFIX_SIZE);
+    DataInputStream w = new DataInputStream(baos);
+    try {
+      type = w.readUTF();
+      indicator = w.readUTF();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to convert type and indicator from bytes", e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    EnrichmentKey that = (EnrichmentKey) o;
+
+    if (indicator != null ? !indicator.equals(that.indicator) : that.indicator != null) return false;
+    return type != null ? type.equals(that.type) : that.type == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = indicator != null ? indicator.hashCode() : 0;
+    result = 31 * result + (type != null ? type.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "EnrichmentKey{" +
+            "indicator='" + indicator + '\'' +
+            ", type='" + type + '\'' +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
new file mode 100644
index 0000000..d9b7b38
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.converter;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.enrichment.lookup.LookupValue;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class EnrichmentValue implements LookupValue {
+   private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+             @Override
+             protected ObjectMapper initialValue() {
+                return new ObjectMapper();
+             }
+    };
+    public static final String VALUE_COLUMN_NAME = "v";
+    public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
+
+    private Map<String, String> metadata = null;
+
+    public EnrichmentValue()
+    {
+
+    }
+
+    public EnrichmentValue(Map<String, String> metadata) {
+        this.metadata = metadata;
+    }
+
+
+
+    public Map<String, String> getMetadata() {
+        return metadata;
+    }
+
+    @Override
+    public Iterable<Map.Entry<byte[], byte[]>> toColumns() {
+        return AbstractConverter.toEntries( VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(metadata))
+                                  );
+    }
+
+    @Override
+    public void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values) {
+        for(Map.Entry<byte[], byte[]> cell : values) {
+            if(Bytes.equals(cell.getKey(), VALUE_COLUMN_NAME_B)) {
+                metadata = stringToValue(Bytes.toString(cell.getValue()));
+            }
+        }
+    }
+    public Map<String, String> stringToValue(String s){
+        try {
+            return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to convert string to metadata: " + s);
+        }
+    }
+    public String valueToString(Map<String, String> value) {
+        try {
+            return _mapper.get().writeValueAsString(value);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to convert metadata to string: " + value);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        EnrichmentValue that = (EnrichmentValue) o;
+
+        return getMetadata() != null ? getMetadata().equals(that.getMetadata()) : that.getMetadata() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return getMetadata() != null ? getMetadata().hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "EnrichmentValue{" +
+                "metadata=" + metadata +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/HbaseConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/HbaseConverter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/HbaseConverter.java
new file mode 100644
index 0000000..7300b76
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/HbaseConverter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.converter;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import java.io.IOException;
+
+public interface HbaseConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> {
+    Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+    LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily) throws IOException;
+
+    Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+    LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily) throws IOException;
+
+    Get toGet(String columnFamily, KEY_T key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
new file mode 100644
index 0000000..28f9956
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface EnrichmentAdapter<T>
+{
+	void logAccess(T value);
+	JSONObject enrich(T value);
+	boolean initializeAdapter();
+	void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java
new file mode 100644
index 0000000..f43f854
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class EnrichmentLookup extends Lookup<HTableInterface, EnrichmentKey, LookupKV<EnrichmentKey,EnrichmentValue>> implements AutoCloseable {
+
+  public static class Handler implements org.apache.metron.enrichment.lookup.handler.Handler<HTableInterface,EnrichmentKey,LookupKV<EnrichmentKey,EnrichmentValue>> {
+    String columnFamily;
+    HbaseConverter<EnrichmentKey, EnrichmentValue> converter = new EnrichmentConverter();
+    public Handler(String columnFamily) {
+      this.columnFamily = columnFamily;
+    }
+    @Override
+    public boolean exists(EnrichmentKey key, HTableInterface table, boolean logAccess) throws IOException {
+      return table.exists(converter.toGet(columnFamily, key));
+    }
+
+    @Override
+    public LookupKV<EnrichmentKey, EnrichmentValue> get(EnrichmentKey key, HTableInterface table, boolean logAccess) throws IOException {
+      return converter.fromResult(table.get(converter.toGet(columnFamily, key)), columnFamily);
+    }
+
+    private List<Get> keysToGets(Iterable<EnrichmentKey> keys) {
+      List<Get> ret = new ArrayList<>();
+      for(EnrichmentKey key : keys) {
+        ret.add(converter.toGet(columnFamily, key));
+      }
+      return ret;
+    }
+
+    @Override
+    public Iterable<Boolean> exists(Iterable<EnrichmentKey> key, HTableInterface table, boolean logAccess) throws IOException {
+      List<Boolean> ret = new ArrayList<>();
+      for(boolean b : table.existsAll(keysToGets(key))) {
+        ret.add(b);
+      }
+      return ret;
+    }
+
+    @Override
+    public Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> get( Iterable<EnrichmentKey> keys
+                                                                 , HTableInterface table
+                                                                 , boolean logAccess
+                                                                 ) throws IOException
+    {
+      List<LookupKV<EnrichmentKey, EnrichmentValue>> ret = new ArrayList<>();
+      for(Result result : table.get(keysToGets(keys))) {
+        ret.add(converter.fromResult(result, columnFamily));
+      }
+      return ret;
+    }
+
+
+    @Override
+    public void close() throws Exception {
+
+    }
+  }
+  private HTableInterface table;
+  public EnrichmentLookup(HTableInterface table, String columnFamily, AccessTracker tracker) {
+    this.table = table;
+    this.setLookupHandler(new Handler(columnFamily));
+    this.setAccessTracker(tracker);
+  }
+
+  public HTableInterface getTable() {
+    return table;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    table.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/Lookup.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/Lookup.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/Lookup.java
new file mode 100644
index 0000000..265fccd
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/Lookup.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup;
+
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+import org.apache.metron.enrichment.lookup.handler.Handler;
+
+import java.io.IOException;
+
+public class Lookup<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> implements Handler<CONTEXT_T, KEY_T, RESULT_T> {
+  private String name;
+  private AccessTracker accessTracker;
+  private Handler<CONTEXT_T, KEY_T, RESULT_T> lookupHandler;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public AccessTracker getAccessTracker() {
+    return accessTracker;
+  }
+
+  public void setAccessTracker(AccessTracker accessTracker) {
+    this.accessTracker = accessTracker;
+  }
+
+  public Handler< CONTEXT_T, KEY_T, RESULT_T > getLookupHandler() {
+    return lookupHandler;
+  }
+
+  public void setLookupHandler(Handler< CONTEXT_T, KEY_T, RESULT_T > lookupHandler) {
+    this.lookupHandler = lookupHandler;
+  }
+
+  @Override
+  public boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+    if(logAccess) {
+      accessTracker.logAccess(key);
+    }
+    return lookupHandler.exists(key, context, logAccess);
+  }
+
+  @Override
+  public RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+    if(logAccess) {
+      accessTracker.logAccess(key);
+    }
+    return lookupHandler.get(key, context, logAccess);
+  }
+
+  @Override
+  public Iterable<Boolean> exists(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException {
+    if(logAccess) {
+      for (KEY_T k : key) {
+        accessTracker.logAccess(k);
+      }
+    }
+    return lookupHandler.exists(key, context, logAccess);
+  }
+
+
+  @Override
+  public Iterable<RESULT_T> get(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException {
+    if(logAccess) {
+      for (KEY_T k : key) {
+        accessTracker.logAccess(k);
+      }
+    }
+    return lookupHandler.get(key, context, logAccess);
+  }
+
+  @Override
+  public void close() throws Exception {
+    accessTracker.cleanup();
+    lookupHandler.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKV.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKV.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKV.java
new file mode 100644
index 0000000..3538aab
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKV.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.lookup;
+
+import java.io.Serializable;
+
+public class LookupKV<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements Serializable {
+    private KEY_T key;
+    private VALUE_T value;
+    public LookupKV(KEY_T key, VALUE_T value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public KEY_T getKey() {
+        return key;
+    }
+
+    public VALUE_T getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        LookupKV<?, ?> lookupKV = (LookupKV<?, ?>) o;
+
+        if (key != null ? !key.equals(lookupKV.key) : lookupKV.key != null) return false;
+        return value != null ? value.equals(lookupKV.value) : lookupKV.value == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "LookupKV{" +
+                "key=" + key +
+                ", value=" + value +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
new file mode 100644
index 0000000..b7ea00c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup;
+
+public interface LookupKey {
+    byte[] toBytes();
+    void fromBytes(byte[] in);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
new file mode 100644
index 0000000..24fbffd
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.enrichment.lookup;
+
+import java.util.Map;
+import java.util.NavigableMap;
+
+public interface LookupValue {
+    Iterable<Map.Entry<byte[], byte[]>> toColumns();
+    void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTracker.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTracker.java
new file mode 100644
index 0000000..bde6604
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTracker.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public interface AccessTracker extends Serializable{
+    void logAccess(LookupKey key);
+    void configure(Map<String, Object> config);
+    boolean hasSeen(LookupKey key);
+    String getName();
+    AccessTracker union(AccessTracker tracker);
+    void reset();
+    boolean isFull();
+    void cleanup() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
new file mode 100644
index 0000000..5d880f2
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import javax.annotation.Nullable;
+import java.io.*;
+
+public enum AccessTrackerUtil {
+    INSTANCE;
+
+    public static byte[] COLUMN = Bytes.toBytes("v");
+
+    public AccessTracker deserializeTracker(byte[] bytes) throws IOException, ClassNotFoundException {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+        return (AccessTracker) ois.readObject();
+    }
+    public byte[] serializeTracker(AccessTracker tracker) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(bos);
+        oos.writeObject(tracker);
+        oos.flush();
+        oos.close();
+        return bos.toByteArray();
+    }
+
+
+    public void persistTracker(HTableInterface accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException {
+        Put put = new Put(key.toRowKey());
+        put.add(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker));
+        accessTrackerTable.put(put);
+    }
+
+    public Iterable<AccessTracker> loadAll(HTableInterface accessTrackerTable, final String columnFamily, final String name, final long earliest) throws IOException {
+        Scan scan = new Scan(PersistentAccessTracker.AccessTrackerKey.getTimestampScanKey(name, earliest));
+        ResultScanner scanner = accessTrackerTable.getScanner(scan);
+        return Iterables.transform(scanner, new Function<Result, AccessTracker>() {
+
+            @Nullable
+            @Override
+            public AccessTracker apply(@Nullable Result result) {
+                try {
+                    return deserializeTracker(result.getValue(Bytes.toBytes(columnFamily), COLUMN));
+                } catch (Exception e) {
+                    throw new RuntimeException("Unable to deserialize " + name + " @ " + earliest);
+                }
+            }
+        });
+    }
+
+
+    public AccessTracker loadAll(Iterable<AccessTracker> trackers) throws IOException, ClassNotFoundException {
+        AccessTracker tracker = null;
+        for(AccessTracker t : trackers) {
+            if(tracker == null) {
+                tracker = t;
+            }
+            else {
+                tracker = tracker.union(t);
+            }
+        }
+        return tracker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
new file mode 100644
index 0000000..763ba59
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.*;
+import java.util.Map;
+
+public class BloomAccessTracker implements AccessTracker {
+    private static final long serialVersionUID = 1L;
+    public static final String EXPECTED_INSERTIONS_KEY = "expected_insertions";
+    public static final String FALSE_POSITIVE_RATE_KEY = "false_positive_rate";
+    public static final String NAME_KEY = "name";
+
+    private static class LookupKeyFunnel implements Funnel<LookupKey> {
+        @Override
+        public void funnel(LookupKey lookupKey, PrimitiveSink primitiveSink) {
+            primitiveSink.putBytes(lookupKey.toBytes());
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            return this.getClass().equals(obj.getClass());
+        }
+
+    }
+
+    private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
+
+    BloomFilter<LookupKey> filter;
+    String name;
+    int expectedInsertions;
+    double falsePositiveRate;
+    int numInsertions = 0;
+
+    public BloomAccessTracker(String name, int expectedInsertions, double falsePositiveRate) {
+        this.name = name;
+        this.expectedInsertions = expectedInsertions;
+        this.falsePositiveRate = falsePositiveRate;
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+    public BloomAccessTracker() {}
+    public BloomAccessTracker(Map<String, Object> config) {
+        configure(config);
+    }
+
+    protected BloomFilter<LookupKey> getFilter() {
+        return filter;
+    }
+    @Override
+    public void logAccess(LookupKey key) {
+        numInsertions++;
+        filter.put(key);
+    }
+
+    @Override
+    public void configure(Map<String, Object> config) {
+        expectedInsertions = toInt(config.get(EXPECTED_INSERTIONS_KEY));
+        falsePositiveRate = toDouble(config.get(FALSE_POSITIVE_RATE_KEY));
+        name = config.get(NAME_KEY).toString();
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+
+    @Override
+    public boolean hasSeen(LookupKey key) {
+        return filter.mightContain(key);
+    }
+
+    @Override
+    public void reset() {
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+
+    private static double toDouble(Object o) {
+        if(o instanceof String) {
+            return Double.parseDouble((String)o);
+        }
+        else if(o instanceof Number) {
+            return ((Number) o).doubleValue();
+        }
+        else {
+            throw new IllegalStateException("Unable to convert " + o + " to a double.");
+        }
+    }
+    private static int toInt(Object o) {
+        if(o instanceof String) {
+            return Integer.parseInt((String)o);
+        }
+        else if(o instanceof Number) {
+            return ((Number) o).intValue();
+        }
+        else {
+            throw new IllegalStateException("Unable to convert " + o + " to a double.");
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+
+    @Override
+    public AccessTracker union(AccessTracker tracker) {
+        if(filter == null) {
+            throw new IllegalStateException("Unable to union access tracker, because this tracker is not initialized.");
+        }
+        if(tracker instanceof BloomAccessTracker ) {
+            filter.putAll(((BloomAccessTracker)tracker).getFilter());
+            return this;
+        }
+        else {
+            throw new IllegalStateException("Unable to union access tracker, because it's not of the right type (BloomAccessTracker)");
+        }
+    }
+
+    @Override
+    public boolean isFull() {
+        return numInsertions >= expectedInsertions;
+    }
+
+    @Override
+    public void cleanup() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/NoopAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/NoopAccessTracker.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/NoopAccessTracker.java
new file mode 100644
index 0000000..18cad3c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/NoopAccessTracker.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class NoopAccessTracker implements AccessTracker {
+  @Override
+  public void logAccess(LookupKey key) {
+
+  }
+
+  @Override
+  public void configure(Map<String, Object> config) {
+
+  }
+
+  @Override
+  public boolean hasSeen(LookupKey key) {
+    return false;
+  }
+
+  @Override
+  public String getName() {
+    return "noop";
+  }
+
+  @Override
+  public AccessTracker union(AccessTracker tracker) {
+    return null;
+  }
+
+  @Override
+  public void reset() {
+
+  }
+
+  @Override
+  public boolean isFull() {
+    return false;
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+
+  }
+}


Mime
View raw message