metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [23/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88
Date Tue, 26 Apr 2016 14:46:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
new file mode 100644
index 0000000..fc12808
--- /dev/null
+++ b/metron-platform/metron-hbase/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>metron-hbase</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <mysql.version>5.1.31</mysql.version>
+        <slf4j.version>1.7.7</slf4j.version>
+        <storm.hdfs.version>0.1.2</storm.hdfs.version>
+        <guava.version>${global_hbase_guava_version}</guava.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
new file mode 100644
index 0000000..e787e43
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/Connector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import java.io.IOException;
+
+public abstract class Connector {
+  protected TableConfig tableConf;
+  protected String _quorum;
+  protected String _port;
+
+  public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
+    this.tableConf = conf;
+    this._quorum = _quorum;
+    this._port = _port;
+  }
+  public abstract void put(Put put) throws IOException;
+  public abstract void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
new file mode 100644
index 0000000..ace4d80
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableConnector.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import backtype.storm.generated.Bolt;
+
+/**
+ * HTable connector for Storm {@link Bolt}
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ */
+@SuppressWarnings("serial")
+public class HTableConnector extends Connector implements Serializable{
+  private static final Logger LOG = Logger.getLogger(HTableConnector.class);
+  private Configuration conf;
+  protected HTableInterface table;
+  private String tableName;
+  private String connectorImpl;
+
+
+  /**
+   * Initialize HTable connection
+   * @param conf The {@link TupleTableConfig}
+   * @throws IOException
+   */
+  public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
+    super(conf, _quorum, _port);
+    this.connectorImpl = conf.getConnectorImpl();
+    this.tableName = conf.getTableName();
+    this.conf = HBaseConfiguration.create();
+    
+    if(_quorum != null && _port != null)
+    {
+    	this.conf.set("hbase.zookeeper.quorum", _quorum);
+    	this.conf.set("hbase.zookeeper.property.clientPort", _port);
+    }
+
+    LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
+      this.conf.get("hbase.rootdir")));
+
+    try {
+      this.table = getTableProvider().getTable(this.conf, this.tableName);
+    } catch (IOException ex) {
+      throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
+    }
+
+    if (conf.isBatch()) {
+      // Enable client-side write buffer
+      this.table.setAutoFlush(false, true);
+      LOG.info("Enabled client-side write buffer");
+    }
+
+    // If set, override write buffer size
+    if (conf.getWriteBufferSize() > 0) {
+      try {
+        this.table.setWriteBufferSize(conf.getWriteBufferSize());
+
+        LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
+      } catch (IOException ex) {
+        LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
+          ex);
+      }
+    }
+
+    // Check the configured column families exist
+    for (String cf : conf.getColumnFamilies()) {
+      if (!columnFamilyExists(cf)) {
+        throw new RuntimeException(String.format(
+          "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
+      }
+    }
+  }
+
+  protected TableProvider getTableProvider() throws IOException {
+    if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+      return new HTableProvider();
+    }
+    else {
+      try {
+        Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+        return clazz.getConstructor().newInstance();
+      } catch (InstantiationException e) {
+        throw new IOException("Unable to instantiate connector.", e);
+      } catch (IllegalAccessException e) {
+        throw new IOException("Unable to instantiate connector: illegal access", e);
+      } catch (InvocationTargetException e) {
+        throw new IOException("Unable to instantiate connector", e);
+      } catch (NoSuchMethodException e) {
+        throw new IOException("Unable to instantiate connector: no such method", e);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Unable to instantiate connector: class not found", e);
+      }
+    }
+  }
+
+  /**
+   * Checks to see if table contains the given column family
+   * @param columnFamily The column family name
+   * @return boolean
+   * @throws IOException
+   */
+  private boolean columnFamilyExists(final String columnFamily) throws IOException {
+    return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
+  }
+
+  /**
+   * @return the table
+   */
+  public HTableInterface getTable() {
+    return table;
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+      table.put(put);
+  }
+
+  /**
+   * Close the table
+   */
+  @Override
+  public void close() {
+    try {
+      this.table.close();
+    } catch (IOException ex) {
+      LOG.error("Unable to close connection to HBase table " + tableName, ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
new file mode 100644
index 0000000..e454f04
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+
+public class HTableProvider implements TableProvider {
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+        return new HTable(config, tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
new file mode 100644
index 0000000..de2e929
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableConfig.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TableConfig implements Serializable {
+    static final long serialVersionUID = -1L;
+    private String tableName;
+    private boolean batch = true;
+    protected Map<String, Set<String>> columnFamilies = new HashMap<>();
+    private long writeBufferSize = 0L;
+    private String connectorImpl;
+
+    public TableConfig() {
+
+    }
+
+    public TableConfig(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public TableConfig withConnectorImpl(String impl) {
+        connectorImpl = impl;
+        return this;
+    }
+
+    public TableConfig withTable(String table) {
+        this.tableName = table;
+        return this;
+    }
+
+    public TableConfig withBatch(Boolean isBatch) {
+        this.batch = isBatch;
+        return this;
+    }
+
+    public String getConnectorImpl() {
+        return connectorImpl;
+    }
+
+    /**
+     * @return Whether batch mode is enabled
+     */
+    public boolean isBatch() {
+        return batch;
+    }
+
+    /**
+     * @param batch
+     *          Whether to enable HBase's client-side write buffer.
+     *          <p>
+     *          When enabled your bolt will store put operations locally until the
+     *          write buffer is full, so they can be sent to HBase in a single RPC
+     *          call. When disabled each put operation is effectively an RPC and
+     *          is sent straight to HBase. As your bolt can process thousands of
+     *          values per second it is recommended that the write buffer is
+     *          enabled.
+     *          <p>
+     *          Enabled by default
+     */
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
+    /**
+     * @param writeBufferSize
+     *          Overrides the client-side write buffer size.
+     *          <p>
+     *          By default the write buffer size is 2 MB (2097152 bytes). If you
+     *          are storing larger data, you may want to consider increasing this
+     *          value to allow your bolt to efficiently group together a larger
+     *          number of records per RPC
+     *          <p>
+     *          Overrides the write buffer size you have set in your
+     *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+     */
+    public void setWriteBufferSize(long writeBufferSize) {
+        this.writeBufferSize = writeBufferSize;
+    }
+
+    /**
+     * @return the writeBufferSize
+     */
+    public long getWriteBufferSize() {
+        return writeBufferSize;
+    }
+    /**
+     * @return A Set of configured column families
+     */
+    public Set<String> getColumnFamilies() {
+        return this.columnFamilies.keySet();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
new file mode 100644
index 0000000..dc0569e
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface TableProvider extends Serializable {
+    HTableInterface getTable(Configuration config, String tableName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
new file mode 100644
index 0000000..8257d8a
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for Storm {@link Tuple} to HBase serialization.
+ */
+@SuppressWarnings("serial")
+public class TupleTableConfig extends TableConfig implements Serializable {
+  private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
+  static final long serialVersionUID = -1L;
+  public static final long DEFAULT_INCREMENT = 1L;
+  
+  protected String tupleRowKeyField;
+  protected String tupleTimestampField;
+  protected Durability durability = Durability.USE_DEFAULT;
+  private String fields;
+
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   */
+  public TupleTableConfig(final String table, final String rowKeyField) {
+    super(table);
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = "";
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+  
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   * @param timestampField
+   *          The {@link Tuple} field used to set the timestamp
+   */
+  public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
+    super(table);
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = timestampField;
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+
+  public TupleTableConfig() {
+    super(null);
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+
+
+
+  public TupleTableConfig withRowKeyField(String rowKeyField) {
+    this.tupleRowKeyField = rowKeyField;
+    return this;
+  }
+
+  public TupleTableConfig withTimestampField(String timestampField) {
+    this.tupleTimestampField = timestampField;
+    return this;
+  }
+
+  public TupleTableConfig withFields(String fields) {
+    this.fields = fields;
+    return this;
+  }
+
+
+
+  public String getFields() {
+    return fields;
+  }
+
+
+
+  /**
+   * Add column family and column qualifier to be extracted from tuple
+   * 
+   * @param columnFamily
+   *          The column family name
+   * @param columnQualifier
+   *          The column qualifier name
+   */
+  public void addColumn(final String columnFamily, final String columnQualifier) {
+    Set<String> columns = this.columnFamilies.get(columnFamily);
+    
+    if (columns == null) {
+      columns = new HashSet<String>();
+    }
+    columns.add(columnQualifier);
+    
+    this.columnFamilies.put(columnFamily, columns);
+  }
+  
+  /**
+   * Creates a HBase {@link Put} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @return {@link Put}
+   */
+  public Put getPutFromTuple(final Tuple tuple) throws IOException{
+    byte[] rowKey = null;
+    try {
+      rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    }
+    catch(IllegalArgumentException iae) {
+      throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
+    }
+    
+    long ts = 0;
+    if (!tupleTimestampField.equals("")) {
+      ts = tuple.getLongByField(tupleTimestampField);
+    }
+    
+    Put p = new Put(rowKey);
+    
+    p.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] cqBytes = Bytes.toBytes(cq);
+          byte[] val = tuple.getBinaryByField(cq);
+          
+          if (ts > 0) {
+            p.add(cfBytes, cqBytes, ts, val);
+          } else {
+            p.add(cfBytes, cqBytes, val);
+          }
+        }
+      }
+    }
+    
+    return p;
+  }
+  
+  /**
+   * Creates a HBase {@link Increment} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @param increment
+   *          The amount to increment the counter by
+   * @return {@link Increment}
+   */
+  public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
+    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    
+    Increment inc = new Increment(rowKey);
+    inc.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] val;
+          try {
+            val = Bytes.toBytes(tuple.getStringByField(cq));
+          } catch (IllegalArgumentException ex) {
+            // if cq isn't a tuple field, use cq for counter instead of tuple
+            // value
+            val = Bytes.toBytes(cq);
+          }
+          inc.addColumn(cfBytes, val, increment);
+        }
+      }
+    }
+    
+    return inc;
+  }
+  
+  /**
+   * Increment the counter for the given family and column by the specified
+   * amount
+   * <p>
+   * If the family and column already exist in the Increment the counter value
+   * is incremented by the specified amount rather than overridden, as it is in
+   * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
+   * 
+   * @param inc
+   *          The {@link Increment} to update
+   * @param family
+   *          The column family
+   * @param qualifier
+   *          The column qualifier
+   * @param amount
+   *          The amount to increment the counter by
+   */
+  public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
+    
+    NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
+    if (set == null) {
+      set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    }
+    
+    // If qualifier exists, increment amount
+    Long counter = set.get(qualifier);
+    if (counter == null) {
+      counter = 0L;
+    }
+    set.put(qualifier, amount + counter);
+    
+    inc.getFamilyMapOfLongs().put(family, set);
+  }
+  
+
+
+  /**
+   * @param durability
+   *          Sets whether to write to HBase's edit log.
+   *          <p>
+   *          Setting to false will mean fewer operations to perform when
+   *          writing to HBase and hence better performance, but changes that
+   *          haven't been flushed to a store file will be lost in the event of
+   *          HBase failure
+   *          <p>
+   *          Enabled by default
+   */
+  public void setDurability(Durability durability) {
+    this.durability = durability;
+  }
+  
+  
+  public Durability getDurability() {
+    return  durability;
+  }
+  
+
+
+  /**
+   * @return the tupleRowKeyField
+   */
+  public String getTupleRowKeyField() {
+    return tupleRowKeyField;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
new file mode 100644
index 0000000..1fd69b3
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializable {
+
+  private String tableName;
+  private String connectorImpl;
+  private TableProvider provider;
+  private HTableInterface table;
+
+  public HBaseWriter(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public HBaseWriter withProviderImpl(String connectorImpl) {
+    this.connectorImpl = connectorImpl;
+    return this;
+  }
+
+  @Override
+  public void init() {
+    final Configuration config = HBaseConfiguration.create();
+    try {
+      provider = ReflectionUtils.createInstance(connectorImpl, new HTableProvider());
+      table = provider.getTable(config, tableName);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
+    Put put = new Put(getKey(tuple, message));
+    Map<String, byte[]> values = getValues(tuple, message);
+    for(String column: values.keySet()) {
+      String[] columnParts = column.split(":");
+      long timestamp = getTimestamp(tuple, message);
+      if (timestamp > -1) {
+        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), timestamp, values.get(column));
+      } else {
+        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), values.get(column));
+      }
+    }
+    table.put(put);
+  }
+
+  @Override
+  public void close() throws Exception {
+    table.close();
+  }
+
+  public abstract byte[] getKey(Tuple tuple, JSONObject message);
+  public abstract long getTimestamp(Tuple tuple, JSONObject message);
+  public abstract Map<String, byte[]> getValues(Tuple tuple, JSONObject message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
new file mode 100644
index 0000000..b3c3f09
--- /dev/null
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>metron-platform</artifactId>
+    <version>0.1BETA</version>
+  </parent>
+  <artifactId>metron-integration-test</artifactId>
+  <description>Metron Integration Test</description>
+  <properties>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${global_junit_version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>flux-core</artifactId>
+      <version>${global_flux_version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${global_storm_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_hbase_guava_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${global_hbase_version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>log4j</artifactId>
+          <groupId>log4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${global_hadoop_version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.9.2</artifactId>
+      <version>${global_kafka_version}</version>
+      <classifier>test</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.9.2</artifactId>
+      <version>${global_kafka_version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-enrichment</artifactId>
+      <version>0.1BETA</version>
+    </dependency>
+    <!--dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-parsers</artifactId>
+      <version>0.1BETA</version>
+    </dependency-->
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-test-utilities</artifactId>
+      <version>0.1BETA</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+  </build>
+  <reporting>
+  </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
new file mode 100644
index 0000000..3f21c0d
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.TestConstants;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Properties;
+
+public abstract class BaseIntegrationTest {
+
+  protected KafkaWithZKComponent getKafkaComponent(final Properties topologyProperties, List<KafkaWithZKComponent.Topic> topics) {
+    return new KafkaWithZKComponent().withTopics(topics)
+            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+              @Nullable
+              @Override
+              public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+                topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+                try {
+                  ConfigurationsUtils.uploadConfigsToZookeeper(TestConstants.SAMPLE_CONFIG_PATH, kafkaWithZKComponent.getZookeeperConnect());
+                } catch (Exception e) {
+                  throw new IllegalStateException(e);
+                }
+                return null;
+              }
+            });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
new file mode 100644
index 0000000..c938741
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class ComponentRunner {
+    public static class Builder {
+        LinkedHashMap<String, InMemoryComponent> components;
+        String[] startupOrder;
+        String[] shutdownOrder;
+        long timeBetweenAttempts = 1000;
+        int numRetries = 5;
+        long maxTimeMS = 120000;
+        public Builder() {
+            components = new LinkedHashMap<String, InMemoryComponent>();
+        }
+
+        public Builder withNumRetries(int numRetries) {
+            this.numRetries = numRetries;
+            return this;
+        }
+
+        public Builder withMaxTimeMS(long maxTimeMS) {
+            this.maxTimeMS = maxTimeMS;
+            return this;
+        }
+
+        public Builder withComponent(String name, InMemoryComponent component) {
+            components.put(name, component);
+            return this;
+        }
+
+        public Builder withCustomStartupOrder(String[] startupOrder) {
+            this.startupOrder = startupOrder;
+            return this;
+        }
+        public Builder withCustomShutdownOrder(String[] shutdownOrder) {
+            this.shutdownOrder = shutdownOrder;
+            return this;
+        }
+        public Builder withMillisecondsBetweenAttempts(long timeBetweenAttempts) {
+            this.timeBetweenAttempts = timeBetweenAttempts;
+            return this;
+        }
+        private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
+            String[] ret = new String[components.size()];
+            int i = 0;
+            for(String component : components.keySet()) {
+                ret[i++] = component;
+            }
+            return ret;
+        }
+        public ComponentRunner build() {
+            if(shutdownOrder == null) {
+                shutdownOrder = toOrderedList(components);
+            }
+            if(startupOrder == null) {
+                startupOrder = toOrderedList(components);
+            }
+            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts, numRetries, maxTimeMS);
+        }
+
+    }
+
+    LinkedHashMap<String, InMemoryComponent> components;
+    String[] startupOrder;
+    String[] shutdownOrder;
+    long timeBetweenAttempts;
+    int numRetries;
+    long maxTimeMS;
+    public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
+                          , String[] startupOrder
+                          , String[] shutdownOrder
+                          , long timeBetweenAttempts
+                          , int numRetries
+                          , long maxTimeMS
+                          )
+    {
+        this.components = components;
+        this.startupOrder = startupOrder;
+        this.shutdownOrder = shutdownOrder;
+        this.timeBetweenAttempts = timeBetweenAttempts;
+        this.numRetries = numRetries;
+        this.maxTimeMS = maxTimeMS;
+    }
+
+    public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
+        return clazz.cast(getComponents().get(name));
+    }
+
+    public LinkedHashMap<String, InMemoryComponent> getComponents() {
+        return components;
+    }
+
+    public void start() throws UnableToStartException {
+        for(String componentName : startupOrder) {
+            components.get(componentName).start();
+        }
+    }
+    public void stop() {
+        for(String componentName : shutdownOrder) {
+            components.get(componentName).stop();
+        }
+    }
+
+
+    public <T> T process(Processor<T> successState) {
+        int retryCount = 0;
+        long start = System.currentTimeMillis();
+        while(true) {
+            long duration = System.currentTimeMillis() - start;
+            if(duration > maxTimeMS) {
+                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMS);
+            }
+            ReadinessState state = successState.process(this);
+            if(state == ReadinessState.READY) {
+                return successState.getResult();
+            }
+            else if(state == ReadinessState.NOT_READY) {
+                retryCount++;
+                if(numRetries > 0 && retryCount > numRetries) {
+                    throw new RuntimeException("Too many retries: " + retryCount);
+                }
+            }
+            try {
+                Thread.sleep(timeBetweenAttempts);
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Unable to sleep", e);
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
new file mode 100644
index 0000000..21019c3
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.*;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.Constants;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.mock.MockGeoAdapter;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import org.apache.metron.integration.utils.SampleUtil;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Stack;
+
+public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
+  private static final String SRC_IP = "ip_src_addr";
+  private static final String DST_IP = "ip_dst_addr";
+  private static final String MALICIOUS_IP_TYPE = "malicious_ip";
+  private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
+  private static final Map<String, String> PLAYFUL_ENRICHMENT = new HashMap<String, String>() {{
+    put("orientation", "north");
+  }};
+  private String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
+  protected String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
+  private String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "YafExampleParsed";
+  private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "YafIndexed";
+
+
+  public static class Provider implements TableProvider, Serializable {
+    MockHTable.Provider  provider = new MockHTable.Provider();
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      return provider.getTable(config, tableName);
+    }
+  }
+
+  public static void cleanHdfsDir(String hdfsDirStr) {
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if(hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while(!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for(File child : f.listFiles()) {
+            fs.push(child);
+          }
+        }
+        else {
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            f.delete();
+          }
+        }
+      }
+    }
+  }
+
+  public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
+    List<Map<String, Object>> ret = new ArrayList<>();
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if(hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while(!fs.empty()) {
+        File f = fs.pop();
+        if(f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        }
+        else {
+          System.out.println("Processed " + f);
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            List<byte[]> data = TestUtils.readSampleData(f.getPath());
+            Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
+              @Nullable
+              @Override
+              public Map<String, Object> apply(@Nullable byte[] bytes) {
+                String s = new String(bytes);
+                try {
+                  return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+                  });
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+
+  @Test
+  public void test() throws Exception {
+    cleanHdfsDir(hdfsDir);
+    final Configurations configurations = SampleUtil.getSampleConfigs();
+    final String dateFormat = "yyyy.MM.dd.HH";
+    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+    final String cf = "cf";
+    final String trackerHBaseTableName = "tracker";
+    final String threatIntelTableName = "threat_intel";
+    final String enrichmentsTableName = "enrichments";
+    final Properties topologyProperties = new Properties() {{
+      setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+      setProperty("hbase.provider.impl","" + Provider.class.getName());
+      setProperty("threat.intel.tracker.table", trackerHBaseTableName);
+      setProperty("threat.intel.tracker.cf", cf);
+      setProperty("threat.intel.simple.hbase.table", threatIntelTableName);
+      setProperty("threat.intel.simple.hbase.cf", cf);
+      setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
+      setProperty("enrichment.simple.hbase.cf", cf);
+      setProperty("es.clustername", "metron");
+      setProperty("es.port", "9300");
+      setProperty("es.ip", "localhost");
+      setProperty("index.date.format", dateFormat);
+      setProperty("index.hdfs.output", hdfsDir);
+    }};
+    setAdditionalProperties(topologyProperties);
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }});
+
+    //create MockHBaseTables
+    final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
+    final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf);
+    EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+      add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<String, String>())));
+    }});
+    final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf);
+    EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
+      add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3")
+                        , new EnrichmentValue(PLAYFUL_ENRICHMENT )
+                        )
+         );
+    }});
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(fluxPath))
+            .withTopologyName("test")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+    InMemoryComponent searchComponent = getSearchComponent(topologyProperties);
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("search", searchComponent)
+            .withComponent("storm", fluxComponent)
+            .withMillisecondsBetweenAttempts(10000)
+            .withNumRetries(10)
+            .build();
+    runner.start();
+
+    try {
+      fluxComponent.submitTopology();
+
+      kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+      List<Map<String, Object>> docs = runner.process(getProcessor(inputMessages));
+      Assert.assertEquals(inputMessages.size(), docs.size());
+      List<Map<String, Object>> cleanedDocs = cleanDocs(docs);
+      validateAll(cleanedDocs);
+
+
+      List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
+      Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
+      Assert.assertEquals(new File(hdfsDir).list().length, 1);
+      Assert.assertEquals(new File(hdfsDir).list()[0], "yaf");
+      validateAll(docsFromDisk);
+    }
+    finally {
+      cleanHdfsDir(hdfsDir);
+      runner.stop();
+    }
+  }
+
+  public List<Map<String, Object>> cleanDocs(List<Map<String, Object>> docs) {
+    List<Map<String, Object>> cleanedDocs = new ArrayList<>();
+    for(Map<String, Object> doc: docs) {
+      Map<String, Object> cleanedFields = new HashMap<>();
+      for(String field: doc.keySet()) {
+        cleanedFields.put(cleanField(field), doc.get(field));
+      }
+      cleanedDocs.add(cleanedFields);
+    }
+    return cleanedDocs;
+  }
+
+  public static void validateAll(List<Map<String, Object>> docs) {
+    for (Map<String, Object> doc : docs) {
+      baseValidation(doc);
+      hostEnrichmentValidation(doc);
+      geoEnrichmentValidation(doc);
+      threatIntelValidation(doc);
+      simpleEnrichmentValidation(doc);
+    }
+  }
+
+  public static void baseValidation(Map<String, Object> jsonDoc) {
+    assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet());
+    assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet());
+    for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
+      //ensure no values are empty.
+      Assert.assertTrue(kv.getValue().toString().length() > 0);
+    }
+    //ensure we always have a source ip and destination ip
+    Assert.assertNotNull(jsonDoc.get(SRC_IP));
+    Assert.assertNotNull(jsonDoc.get(DST_IP));
+  }
+
+  private static class EvaluationPayload {
+    Map<String, Object> indexedDoc;
+    String key;
+    public EvaluationPayload(Map<String, Object> indexedDoc, String key) {
+      this.indexedDoc = indexedDoc;
+      this.key = key;
+    }
+  }
+
+  private static enum HostEnrichments implements Predicate<EvaluationPayload>{
+    LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("YES");
+      }
+    })
+    ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("UNKNOWN");
+      }
+    })
+    ,IMPORTANT(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.asset_value").equals("important");
+      }
+    })
+    ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("printer");
+      }
+    })
+    ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("webserver");
+      }
+    })
+    ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("unknown");
+      }
+    })
+    ;
+
+    Predicate<EvaluationPayload> _predicate;
+    HostEnrichments(Predicate<EvaluationPayload> predicate) {
+      this._predicate = predicate;
+    }
+
+    public boolean apply(EvaluationPayload payload) {
+      return _predicate.apply(payload);
+    }
+
+  }
+
+  private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
+    for(String key : keys) {
+      if(key.startsWith(topLevel)) {
+        String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
+        String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
+                       + Joiner.on(",").join(expectedEnrichments) + "), but it was not there.  If you've created a new"
+                       + " enrichment, then please add a validation method to this unit test.  Otherwise, it's a solid error"
+                       + " and should be investigated.";
+        Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
+      }
+    }
+  }
+  private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc) {
+    if(indexedDoc.get(SRC_IP).equals("10.0.2.3")
+            || indexedDoc.get(DST_IP).equals("10.0.2.3")
+            ) {
+      Assert.assertTrue(keyPatternExists("enrichments.hbaseEnrichment", indexedDoc));
+      if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation")
+        );
+      }
+      else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
+        Assert.assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation")
+        );
+      }
+    }
+
+  }
+  private static void threatIntelValidation(Map<String, Object> indexedDoc) {
+    if(indexedDoc.get(SRC_IP).equals("10.0.2.3")
+    || indexedDoc.get(DST_IP).equals("10.0.2.3")
+            ) {
+      //if we have any threat intel messages, we want to tag is_alert to true
+      Assert.assertTrue(keyPatternExists("threatintels.", indexedDoc));
+      Assert.assertEquals(indexedDoc.get("is_alert"), "true");
+    }
+    else {
+      //For YAF this is the case, but if we do snort later on, this will be invalid.
+      Assert.assertNull(indexedDoc.get("is_alert"));
+      Assert.assertFalse(keyPatternExists("threatintels.", indexedDoc));
+    }
+    //ip threat intels
+    if(keyPatternExists("threatintels.hbaseThreatIntel.", indexedDoc)) {
+      if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE), "alert");
+      }
+      else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE), "alert");
+      }
+      else {
+        Assert.fail("There was a threat intels that I did not expect: " + indexedDoc);
+      }
+    }
+
+  }
+
+  private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
+    //should have geo enrichment on every message due to mock geo adapter
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+  }
+
+  private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
+    boolean enriched = false;
+    //important local printers
+    {
+      Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get(DST_IP))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+        );
+        enriched = true;
+      }
+    }
+    //important local webservers
+    {
+      Set<String> ips = setOf("10.1.128.236");
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get(DST_IP))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
+        );
+        enriched = true;
+      }
+    }
+    if(!enriched) {
+      Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
+    }
+  }
+
+
+  private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
+    for(String k : indexedObj.keySet()) {
+      if(k.startsWith(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  private static Set<String> setOf(String... items) {
+    Set<String> ret = new HashSet<>();
+    for(String item : items) {
+      ret.add(item);
+    }
+    return ret;
+  }
+
+  abstract public InMemoryComponent getSearchComponent(Properties topologyProperties) throws Exception;
+  abstract public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages);
+  abstract public void setAdditionalProperties(Properties topologyProperties);
+  abstract public String cleanField(String field);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
new file mode 100644
index 0000000..8a9ee96
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public interface InMemoryComponent {
+    public void start() throws UnableToStartException;
+    public void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
new file mode 100644
index 0000000..bbcfb73
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/Processor.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public interface Processor<T> {
+    ReadinessState process(ComponentRunner runner);
+    T getResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
new file mode 100644
index 0000000..5cdfbb4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ReadinessState.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public enum ReadinessState {
+    READY, NOT_READY;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
new file mode 100644
index 0000000..0fcda14
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/UnableToStartException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class UnableToStartException extends Exception {
+    public UnableToStartException(String message) {
+        super(message);
+    }
+    public UnableToStartException(String message, Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
new file mode 100644
index 0000000..3bb0c56
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.components;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.storm.flux.FluxBuilder;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.thrift7.TException;
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+public class FluxTopologyComponent implements InMemoryComponent {
+    LocalCluster stormCluster;
+    String topologyName;
+    File topologyLocation;
+    Properties topologyProperties;
+
+    public static class Builder {
+        String topologyName;
+        File topologyLocation;
+        Properties topologyProperties;
+        public Builder withTopologyName(String name) {
+            this.topologyName = name;
+            return this;
+        }
+        public Builder withTopologyLocation(File location) {
+            this.topologyLocation = location;
+            return this;
+        }
+        public Builder withTopologyProperties(Properties properties) {
+            this.topologyProperties = properties;
+            return this;
+        }
+
+        public FluxTopologyComponent build() {
+            return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
+        }
+    }
+
+    public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
+        this.topologyName = topologyName;
+        this.topologyLocation = topologyLocation;
+        this.topologyProperties = topologyProperties;
+    }
+
+    public LocalCluster getStormCluster() {
+        return stormCluster;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public File getTopologyLocation() {
+        return topologyLocation;
+    }
+
+    public Properties getTopologyProperties() {
+        return topologyProperties;
+    }
+
+    public void start() throws UnableToStartException{
+        try {
+            stormCluster = new LocalCluster();
+        } catch (Exception e) {
+            throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
+        }
+    }
+
+    public void stop() {
+        stormCluster.shutdown();
+    }
+
+    public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
+        startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
+    }
+    private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
+        TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        Assert.assertNotNull(topology);
+        topology.validate();
+        stormCluster.submitTopology(topologyName, conf, topology);
+    }
+
+    private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
+        File tmpFile = File.createTempFile(topologyName, "props");
+        tmpFile.deleteOnExit();
+        FileWriter propWriter = null;
+        try {
+            propWriter = new FileWriter(tmpFile);
+            properties.store(propWriter, topologyName + " properties");
+        }
+        finally {
+            if(propWriter != null) {
+                propWriter.close();
+                return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+            }
+
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
new file mode 100644
index 0000000..fb7bcde
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.components;
+
+
+import com.google.common.base.Function;
+import kafka.admin.AdminUtils;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.metron.integration.InMemoryComponent;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+
+public class KafkaWithZKComponent implements InMemoryComponent {
+
+
+  public static class Topic {
+    public int numPartitions;
+    public String name;
+
+    public Topic(String name, int numPartitions) {
+      this.numPartitions = numPartitions;
+      this.name = name;
+    }
+  }
+  private transient KafkaServer kafkaServer;
+  private transient EmbeddedZookeeper zkServer;
+  private transient ZkClient zkClient;
+  private transient ConsumerConnector consumer;
+  private String zookeeperConnectString;
+  private int brokerPort = 6667;
+  private List<Topic> topics = Collections.emptyList();
+  private Function<KafkaWithZKComponent, Void> postStartCallback;
+
+  public KafkaWithZKComponent withPostStartCallback(Function<KafkaWithZKComponent, Void> f) {
+    postStartCallback = f;
+    return this;
+  }
+
+  public KafkaWithZKComponent withExistingZookeeper(String zookeeperConnectString) {
+    this.zookeeperConnectString = zookeeperConnectString;
+    return this;
+  }
+
+  public KafkaWithZKComponent withBrokerPort(int brokerPort) {
+    if(brokerPort <= 0)
+    {
+      brokerPort = TestUtils.choosePort();
+    }
+    this.brokerPort = brokerPort;
+    return this;
+  }
+
+  public KafkaWithZKComponent withTopics(List<Topic> topics) {
+    this.topics = topics;
+    return this;
+  }
+
+  public List<Topic> getTopics() {
+    return topics;
+  }
+
+  public int getBrokerPort() {
+    return brokerPort;
+  }
+
+
+  public String getBrokerList()  {
+    return "localhost:" + brokerPort;
+  }
+
+  public KafkaProducer<String, byte[]> createProducer()
+  {
+    return createProducer(new HashMap<String, Object>());
+  }
+
+  public KafkaProducer<String, byte[]> createProducer(Map<String, Object> properties)
+  {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", getBrokerList());
+    producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerConfig.put("request.required.acks", "-1");
+    producerConfig.put("fetch.message.max.bytes", ""+ 1024*1024*10);
+    producerConfig.put("replica.fetch.max.bytes", "" + 1024*1024*10);
+    producerConfig.put("message.max.bytes", "" + 1024*1024*10);
+    producerConfig.put("message.send.max.retries", "10");
+    producerConfig.putAll(properties);
+    return new KafkaProducer<>(producerConfig);
+  }
+
+  @Override
+  public void start() {
+    // setup Zookeeper
+    if(zookeeperConnectString == null) {
+      String zkConnect = TestZKUtils.zookeeperConnect();
+      zkServer = new EmbeddedZookeeper(zkConnect);
+      zookeeperConnectString = zkServer.connectString();
+    }
+    zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+    // setup Broker
+    Properties props = TestUtils.createBrokerConfig(0, brokerPort, true);
+    KafkaConfig config = new KafkaConfig(props);
+    Time mock = new MockTime();
+    kafkaServer = TestUtils.createServer(config, mock);
+    for(Topic topic : getTopics()) {
+      try {
+        createTopic(topic.name, topic.numPartitions, true);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Unable to create topic", e);
+      }
+    }
+    postStartCallback.apply(this);
+  }
+
+  public String getZookeeperConnect() {
+    return zookeeperConnectString;
+  }
+
+  @Override
+  public void stop() {
+    kafkaServer.shutdown();
+    zkClient.close();
+    if(zkServer != null) {
+      zkServer.shutdown();
+    }
+
+  }
+
+  public List<byte[]> readMessages(String topic) {
+    SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
+    FetchRequest req = new FetchRequestBuilder()
+            .clientId("consumer")
+            .addFetch(topic, 0, 0, 100000)
+            .build();
+    FetchResponse fetchResponse = consumer.fetch(req);
+    Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+    List<byte[]> messages = new ArrayList<>();
+    while(results.hasNext()) {
+      ByteBuffer payload = results.next().message().payload();
+      byte[] bytes = new byte[payload.limit()];
+      payload.get(bytes);
+      messages.add(bytes);
+    }
+    return messages;
+  }
+
+  public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic) {
+    return getStreamIterator(topic, "group0", "consumer0");
+  }
+  public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic, String group, String consumerName) {
+    // setup simple consumer
+    Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), group, consumerName, -1);
+    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
+    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+    topicCountMap.put(topic, 1);
+    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
+    return iterator;
+  }
+
+  public void shutdownConsumer() {
+    consumer.shutdown();
+  }
+
+  public void createTopic(String name) throws InterruptedException {
+    createTopic(name, 1, true);
+  }
+
+  public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+    List<KafkaServer> servers = new ArrayList<>();
+    servers.add(kafkaServer);
+    for(int part = 0;part < numPartitions;++part) {
+      TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+    }
+  }
+
+  public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+    AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+    if(waitUntilMetadataIsPropagated) {
+      waitUntilMetadataIsPropagated(name, numPartitions);
+    }
+  }
+
+  public void writeMessages(String topic, Collection<byte[]> messages) {
+    KafkaProducer<String, byte[]> kafkaProducer = createProducer();
+    for(byte[] message: messages) {
+      kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+    }
+    kafkaProducer.close();
+  }
+}
\ No newline at end of file



Mime
View raw message