metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [1/4] metron git commit: METRON-1051: Enable the ability to update indexed messages closes apache/incubator-metron#666
Date Mon, 07 Aug 2017 18:14:57 GMT
Repository: metron
Updated Branches:
  refs/heads/master 3d4751315 -> 813adf2da


http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
deleted file mode 100644
index eb809f9..0000000
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.dao;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.indexing.dao.search.FieldType;
-import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
-import org.apache.metron.indexing.dao.search.SearchResponse;
-import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.integration.InMemoryComponent;
-import org.junit.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class IndexingDaoIntegrationTest {
-  /**
-   * [
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
-   * ]
-   */
-  @Multiline
-  public static String broData;
-
-  /**
-   * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
-   * ]
-   */
-  @Multiline
-  public static String snortData;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String allQuery;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "ip_src_addr:192.168.1.1",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String filterQuery;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "ip_src_port",
-   *     "sortOrder": "asc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String sortQuery;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 4,
-   * "size": 3,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String paginationQuery;
-
-  /**
-   * {
-   * "indices": ["bro"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String indexQuery;
-
-  /**
-   * {
-   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"],
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String facetQuery;
-
-  /**
-   * {
-   * "facetFields": ["location_point"],
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String badFacetQuery;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 10,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String disabledFacetQuery;
-
-  /**
-   * {
-   * "indices": ["bro", "snort"],
-   * "query": "*",
-   * "from": 0,
-   * "size": 101,
-   * "sort": [
-   *   {
-   *     "field": "timestamp",
-   *     "sortOrder": "desc"
-   *   }
-   * ]
-   * }
-   */
-  @Multiline
-  public static String exceededMaxResultsQuery;
-
-  protected IndexDao dao;
-  protected InMemoryComponent indexComponent;
-
-  @Before
-  public void setup() throws Exception {
-    indexComponent = startIndex();
-    loadTestData();
-    dao = createDao();
-  }
-
-  @Test
-  public void test() throws Exception {
-    //All Query Testcase
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(allQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(10, response.getTotal());
-      List<SearchResult> results = response.getResults();
-      for(int i = 0;i < 5;++i) {
-        Assert.assertEquals("snort", results.get(i).getSource().get("source:type"));
-        Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
-      }
-      for(int i = 5;i < 10;++i) {
-        Assert.assertEquals("bro", results.get(i).getSource().get("source:type"));
-        Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
-      }
-    }
-    //Filter test case
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(3, response.getTotal());
-      List<SearchResult> results = response.getResults();
-      Assert.assertEquals("snort", results.get(0).getSource().get("source:type"));
-      Assert.assertEquals(9, results.get(0).getSource().get("timestamp"));
-      Assert.assertEquals("snort", results.get(1).getSource().get("source:type"));
-      Assert.assertEquals(7, results.get(1).getSource().get("timestamp"));
-      Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
-      Assert.assertEquals(1, results.get(2).getSource().get("timestamp"));
-    }
-    //Sort test case
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(sortQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(10, response.getTotal());
-      List<SearchResult> results = response.getResults();
-      for(int i = 8001;i < 8011;++i) {
-        Assert.assertEquals(i, results.get(i-8001).getSource().get("ip_src_port"));
-      }
-    }
-    //pagination test case
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(paginationQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(10, response.getTotal());
-      List<SearchResult> results = response.getResults();
-      Assert.assertEquals(3, results.size());
-      Assert.assertEquals("snort", results.get(0).getSource().get("source:type"));
-      Assert.assertEquals(6, results.get(0).getSource().get("timestamp"));
-      Assert.assertEquals("bro", results.get(1).getSource().get("source:type"));
-      Assert.assertEquals(5, results.get(1).getSource().get("timestamp"));
-      Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
-      Assert.assertEquals(4, results.get(2).getSource().get("timestamp"));
-    }
-    //Index query
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(indexQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(5, response.getTotal());
-      List<SearchResult> results = response.getResults();
-      for(int i = 5,j=0;i > 0;i--,j++) {
-        Assert.assertEquals("bro", results.get(j).getSource().get("source:type"));
-        Assert.assertEquals(i, results.get(j).getSource().get("timestamp"));
-      }
-    }
-    //Facet query including all field types
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertEquals(10, response.getTotal());
-      Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
-      Assert.assertEquals(8, facetCounts.size());
-      Map<String, Long> sourceTypeCounts = facetCounts.get("source:type");
-      Assert.assertEquals(2, sourceTypeCounts.size());
-      Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro"));
-      Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort"));
-      Map<String, Long> ipSrcAddrCounts = facetCounts.get("ip_src_addr");
-      Assert.assertEquals(8, ipSrcAddrCounts.size());
-      Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.2"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.3"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.4"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.5"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7"));
-      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8"));
-      Map<String, Long> ipSrcPortCounts = facetCounts.get("ip_src_port");
-      Assert.assertEquals(10, ipSrcPortCounts.size());
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8002"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8003"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8004"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8005"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8006"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8007"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009"));
-      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010"));
-      Map<String, Long> longFieldCounts = facetCounts.get("long_field");
-      Assert.assertEquals(2, longFieldCounts.size());
-      Assert.assertEquals(new Long(8), longFieldCounts.get("10000"));
-      Assert.assertEquals(new Long(2), longFieldCounts.get("20000"));
-      Map<String, Long> timestampCounts = facetCounts.get("timestamp");
-      Assert.assertEquals(10, timestampCounts.size());
-      Assert.assertEquals(new Long(1), timestampCounts.get("1"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("2"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("3"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("4"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("5"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("6"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("7"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("8"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("9"));
-      Assert.assertEquals(new Long(1), timestampCounts.get("10"));
-      Map<String, Long> latitudeCounts = facetCounts.get("latitude");
-      Assert.assertEquals(2, latitudeCounts.size());
-      List<String> latitudeKeys = new ArrayList<>(latitudeCounts.keySet());
-      Collections.sort(latitudeKeys);
-      Assert.assertEquals(48.0001, Double.parseDouble(latitudeKeys.get(0)), 0.00001);
-      Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
-      Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
-      Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
-      Map<String, Long> doubleFieldCounts = facetCounts.get("double_field");
-      Assert.assertEquals(2, doubleFieldCounts.size());
-      List<String> doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet());
-      Collections.sort(doubleFieldKeys);
-      Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001);
-      Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001);
-      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0)));
-      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1)));
-      Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
-      Assert.assertEquals(2, isAlertCounts.size());
-      Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
-      Assert.assertEquals(new Long(4), isAlertCounts.get("false"));
-    }
-    //Bad facet query
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class);
-      try {
-        dao.search(request);
-        Assert.fail("Exception expected, but did not come.");
-      }
-      catch(InvalidSearchException ise) {
-        Assert.assertEquals("Could not execute search", ise.getMessage());
-      }
-    }
-    //Disabled facet query
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, SearchRequest.class);
-      SearchResponse response = dao.search(request);
-      Assert.assertNull(response.getFacetCounts());
-    }
-    //Exceeded maximum results query
-    {
-      SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, SearchRequest.class);
-      try {
-        dao.search(request);
-        Assert.fail("Exception expected, but did not come.");
-      }
-      catch(InvalidSearchException ise) {
-        Assert.assertEquals("Search result size must be less than 100", ise.getMessage());
-      }
-    }
-    // getColumnMetadata with multiple indices
-    {
-      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
-      Assert.assertEquals(2, fieldTypes.size());
-      Map<String, FieldType> broTypes = fieldTypes.get("bro");
-      Assert.assertEquals(11, broTypes.size());
-      Assert.assertEquals(FieldType.STRING, broTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, broTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, broTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, broTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field"));
-      Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point"));
-      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.STRING, broTypes.get("duplicate_name_field"));
-      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
-      Assert.assertEquals(11, snortTypes.size());
-      Assert.assertEquals(FieldType.STRING, snortTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, snortTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field"));
-      Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point"));
-      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
-      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("duplicate_name_field"));
-    }
-    // getColumnMetadata with only bro
-    {
-      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
-      Assert.assertEquals(1, fieldTypes.size());
-      Map<String, FieldType> broTypes = fieldTypes.get("bro");
-      Assert.assertEquals(11, broTypes.size());
-      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
-    }
-    // getColumnMetadata with only snort
-    {
-      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
-      Assert.assertEquals(1, fieldTypes.size());
-      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
-      Assert.assertEquals(11, snortTypes.size());
-      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
-    }
-    // getCommonColumnMetadata with multiple Indices
-    {
-      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Arrays.asList("bro", "snort"));
-      // Should only return fields in both
-      Assert.assertEquals(9, fieldTypes.size());
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field"));
-      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-    }
-    // getCommonColumnMetadata with only bro
-    {
-      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("bro"));
-      Assert.assertEquals(11, fieldTypes.size());
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("duplicate_name_field"));
-    }
-    // getCommonColumnMetadata with only snort
-    {
-      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("snort"));
-      Assert.assertEquals(11, fieldTypes.size());
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field"));
-    }
-  }
-
-  @After
-  public void stop() throws Exception {
-    indexComponent.stop();
-  }
-
-  protected abstract IndexDao createDao() throws Exception;
-  protected abstract InMemoryComponent startIndex() throws Exception;
-  protected abstract void loadTestData() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
new file mode 100644
index 0000000..e262269
--- /dev/null
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.integration.InMemoryComponent;
+import org.junit.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public abstract class SearchIntegrationTest {
+  /**
+   * [
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
+   * ]
+   */
+  @Multiline
+  public static String broData;
+
+  /**
+   * [
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
+   * ]
+   */
+  @Multiline
+  public static String snortData;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String allQuery;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "ip_src_addr:192.168.1.1",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String filterQuery;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "ip_src_port",
+   *     "sortOrder": "asc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String sortQuery;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 4,
+   * "size": 3,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String paginationQuery;
+
+  /**
+   * {
+   * "indices": ["bro"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String indexQuery;
+
+  /**
+   * {
+   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"],
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String facetQuery;
+
+  /**
+   * {
+   * "facetFields": ["location_point"],
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String badFacetQuery;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String disabledFacetQuery;
+
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "*",
+   * "from": 0,
+   * "size": 101,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String exceededMaxResultsQuery;
+
+  protected static IndexDao dao;
+  protected static InMemoryComponent indexComponent;
+
+  @Before
+  public synchronized void setup() throws Exception {
+    if(dao == null && indexComponent == null) {
+      indexComponent = startIndex();
+      loadTestData();
+      dao = createDao();
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    //All Query Testcase
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(allQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      for(int i = 0;i < 5;++i) {
+        Assert.assertEquals("snort", results.get(i).getSource().get("source:type"));
+        Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
+      }
+      for(int i = 5;i < 10;++i) {
+        Assert.assertEquals("bro", results.get(i).getSource().get("source:type"));
+        Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
+      }
+    }
+    //Filter test case
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(3, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      Assert.assertEquals("snort", results.get(0).getSource().get("source:type"));
+      Assert.assertEquals(9, results.get(0).getSource().get("timestamp"));
+      Assert.assertEquals("snort", results.get(1).getSource().get("source:type"));
+      Assert.assertEquals(7, results.get(1).getSource().get("timestamp"));
+      Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
+      Assert.assertEquals(1, results.get(2).getSource().get("timestamp"));
+    }
+    //Sort test case
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(sortQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      for(int i = 8001;i < 8011;++i) {
+        Assert.assertEquals(i, results.get(i-8001).getSource().get("ip_src_port"));
+      }
+    }
+    //pagination test case
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(paginationQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      Assert.assertEquals(3, results.size());
+      Assert.assertEquals("snort", results.get(0).getSource().get("source:type"));
+      Assert.assertEquals(6, results.get(0).getSource().get("timestamp"));
+      Assert.assertEquals("bro", results.get(1).getSource().get("source:type"));
+      Assert.assertEquals(5, results.get(1).getSource().get("timestamp"));
+      Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
+      Assert.assertEquals(4, results.get(2).getSource().get("timestamp"));
+    }
+    //Index query
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(indexQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(5, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      for(int i = 5,j=0;i > 0;i--,j++) {
+        Assert.assertEquals("bro", results.get(j).getSource().get("source:type"));
+        Assert.assertEquals(i, results.get(j).getSource().get("timestamp"));
+      }
+    }
+    //Facet query including all field types
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
+      Assert.assertEquals(8, facetCounts.size());
+      Map<String, Long> sourceTypeCounts = facetCounts.get("source:type");
+      Assert.assertEquals(2, sourceTypeCounts.size());
+      Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro"));
+      Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort"));
+      Map<String, Long> ipSrcAddrCounts = facetCounts.get("ip_src_addr");
+      Assert.assertEquals(8, ipSrcAddrCounts.size());
+      Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.2"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.3"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.4"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.5"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7"));
+      Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8"));
+      Map<String, Long> ipSrcPortCounts = facetCounts.get("ip_src_port");
+      Assert.assertEquals(10, ipSrcPortCounts.size());
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8002"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8003"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8004"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8005"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8006"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8007"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009"));
+      Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010"));
+      Map<String, Long> longFieldCounts = facetCounts.get("long_field");
+      Assert.assertEquals(2, longFieldCounts.size());
+      Assert.assertEquals(new Long(8), longFieldCounts.get("10000"));
+      Assert.assertEquals(new Long(2), longFieldCounts.get("20000"));
+      Map<String, Long> timestampCounts = facetCounts.get("timestamp");
+      Assert.assertEquals(10, timestampCounts.size());
+      Assert.assertEquals(new Long(1), timestampCounts.get("1"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("2"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("3"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("4"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("5"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("6"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("7"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("8"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("9"));
+      Assert.assertEquals(new Long(1), timestampCounts.get("10"));
+      Map<String, Long> latitudeCounts = facetCounts.get("latitude");
+      Assert.assertEquals(2, latitudeCounts.size());
+      List<String> latitudeKeys = new ArrayList<>(latitudeCounts.keySet());
+      Collections.sort(latitudeKeys);
+      Assert.assertEquals(48.0001, Double.parseDouble(latitudeKeys.get(0)), 0.00001);
+      Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
+      Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
+      Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
+      Map<String, Long> doubleFieldCounts = facetCounts.get("double_field");
+      Assert.assertEquals(2, doubleFieldCounts.size());
+      List<String> doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet());
+      Collections.sort(doubleFieldKeys);
+      Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001);
+      Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001);
+      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0)));
+      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1)));
+      Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
+      Assert.assertEquals(2, isAlertCounts.size());
+      Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
+      Assert.assertEquals(new Long(4), isAlertCounts.get("false"));
+    }
+    //Bad facet query
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class);
+      try {
+        dao.search(request);
+        Assert.fail("Exception expected, but did not come.");
+      }
+      catch(InvalidSearchException ise) {
+        Assert.assertEquals("Could not execute search", ise.getMessage());
+      }
+    }
+    //Disabled facet query
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertNull(response.getFacetCounts());
+    }
+    //Exceeded maximum results query
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, SearchRequest.class);
+      try {
+        dao.search(request);
+        Assert.fail("Exception expected, but did not come.");
+      }
+      catch(InvalidSearchException ise) {
+        Assert.assertEquals("Search result size must be less than 100", ise.getMessage());
+      }
+    }
+    // getColumnMetadata with multiple indices
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+      Assert.assertEquals(2, fieldTypes.size());
+      Map<String, FieldType> broTypes = fieldTypes.get("bro");
+      Assert.assertEquals(11, broTypes.size());
+      Assert.assertEquals(FieldType.STRING, broTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, broTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, broTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, broTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point"));
+      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.STRING, broTypes.get("duplicate_name_field"));
+      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
+      Assert.assertEquals(11, snortTypes.size());
+      Assert.assertEquals(FieldType.STRING, snortTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, snortTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point"));
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("duplicate_name_field"));
+    }
+    // getColumnMetadata with only bro
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(1, fieldTypes.size());
+      Map<String, FieldType> broTypes = fieldTypes.get("bro");
+      Assert.assertEquals(11, broTypes.size());
+      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
+    }
+    // getColumnMetadata with only snort
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(1, fieldTypes.size());
+      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
+      Assert.assertEquals(11, snortTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
+    }
+    // getCommonColumnMetadata with multiple Indices
+    {
+      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Arrays.asList("bro", "snort"));
+      // Should only return fields in both
+      Assert.assertEquals(9, fieldTypes.size());
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+    }
+    // getCommonColumnMetadata with only bro
+    {
+      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(11, fieldTypes.size());
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("duplicate_name_field"));
+    }
+    // getCommonColumnMetadata with only snort
+    {
+      Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(11, fieldTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field"));
+    }
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    indexComponent.stop();
+  }
+
+  protected abstract IndexDao createDao() throws Exception;
+  protected abstract InMemoryComponent startIndex() throws Exception;
+  protected abstract void loadTestData() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index 8d3005f..9292f72 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -23,5 +23,7 @@
   "profiler.client.salt.divisor": "1000",
   "hbase.provider.impl": "org.apache.metron.hbase.HTableProvider",
 
-  "geo.hdfs.file": "src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz"
+  "geo.hdfs.file": "src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz",
+  "update.hbase.table" : "updates",
+  "update.hbase.cf" : "t"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml
index c99fb8d..b7c21ff 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -51,6 +51,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-enrichment</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index b9b3246..5f4b3fd 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -26,10 +26,10 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.integration.mock.MockTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.test.mock.MockHTable;
 import org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,11 +50,12 @@ public class SimpleHBaseEnrichmentWriterTest {
     put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_TABLE.getKey(), TABLE_NAME);
     put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_CF.getKey(), TABLE_CF);
     put(SimpleHbaseEnrichmentWriter.Configurations.ENRICHMENT_TYPE.getKey(), ENRICHMENT_TYPE);
-    put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_PROVIDER.getKey(), MockTableProvider.class.getName());
+    put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
   }};
+
   @Before
   public void setupMockTable() {
-    MockTableProvider.addTable(TABLE_NAME, TABLE_CF);
+    MockHBaseTableProvider.addToCache(TABLE_NAME, TABLE_CF);
   }
   @Test
   public void testBatchOneNormalPath() throws Exception {
@@ -140,7 +141,7 @@ public class SimpleHBaseEnrichmentWriterTest {
     Assert.assertEquals(2, values.get(0).getValue().getMetadata().size());
   }
   public static List<LookupKV<EnrichmentKey, EnrichmentValue>> getValues() throws IOException {
-    MockHTable table = MockTableProvider.getTable(TABLE_NAME);
+    MockHTable table = (MockHTable) MockHBaseTableProvider.getFromCache(TABLE_NAME);
     Assert.assertNotNull(table);
     List<LookupKV<EnrichmentKey, EnrichmentValue>> ret = new ArrayList<>();
     EnrichmentConverter converter = new EnrichmentConverter();

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 4efe28b..4f513be 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -29,13 +29,13 @@ import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
-import org.apache.metron.enrichment.integration.mock.MockTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.mock.MockHTable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,7 +55,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
     ,"shew.cf" : "cf"
     ,"shew.keyColumns" : "col2"
     ,"shew.enrichmentType" : "et"
-    ,"shew.hbaseProvider" : "org.apache.metron.enrichment.integration.mock.MockTableProvider"
+    ,"shew.hbaseProvider" : "org.apache.metron.hbase.mock.MockHBaseTableProvider"
     ,"columns" : {
                 "col1" : 0
                ,"col2" : 1
@@ -75,7 +75,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
       add(Bytes.toBytes("col21,col22,col23"));
       add(Bytes.toBytes("col31,col32,col33"));
     }};
-    MockTableProvider.addTable(sensorType, "cf");
+    MockHBaseTableProvider.addToCache(sensorType, "cf");
     final Properties topologyProperties = new Properties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
@@ -112,7 +112,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
 
                 @Override
                 public ReadinessState process(ComponentRunner runner) {
-                  MockHTable table = MockTableProvider.getTable(sensorType);
+                  MockHTable table = (MockHTable) MockHBaseTableProvider.getFromCache(sensorType);
                   if (table != null && table.size() == inputMessages.size()) {
                     EnrichmentConverter converter = new EnrichmentConverter();
                     messages = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
deleted file mode 100644
index 0403d1b..0000000
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ /dev/null
@@ -1,722 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.test.mock;
-
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-/**
- * MockHTable.
- *
- * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
- */
-public class MockHTable implements HTableInterface {
-
-
-  public static class Provider implements Serializable {
-    private static Map<String, HTableInterface> _cache = new HashMap<>();
-    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-      HTableInterface ret = _cache.get(tableName);
-      return ret;
-    }
-
-    public static HTableInterface getFromCache(String tableName) {
-      return _cache.get(tableName);
-    }
-
-    public static HTableInterface addToCache(String tableName, String... columnFamilies) {
-      MockHTable ret =  new MockHTable(tableName, columnFamilies);
-      _cache.put(tableName, ret);
-      return ret;
-    }
-
-    public static void clear() {
-      _cache.clear();
-    }
-  }
-
-  private final String tableName;
-  private final List<String> columnFamilies = new ArrayList<>();
-  private HColumnDescriptor[] descriptors;
-  private final List<Put> putLog;
-  private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
-          = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
-    return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
-  }
-
-  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
-    List<KeyValue> ret = new ArrayList<KeyValue>();
-    for (byte[] family : rowdata.keySet())
-      for (byte[] qualifier : rowdata.get(family).keySet()) {
-        int versionsAdded = 0;
-        for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
-          if (versionsAdded++ == maxVersions)
-            break;
-          Long timestamp = tsToVal.getKey();
-          if (timestamp < timestampStart)
-            continue;
-          if (timestamp > timestampEnd)
-            continue;
-          byte[] value = tsToVal.getValue();
-          ret.add(new KeyValue(row, family, qualifier, timestamp, value));
-        }
-      }
-    return ret;
-  }
-  public MockHTable(String tableName) {
-    this.tableName = tableName;
-    this.putLog = new ArrayList<>();
-  }
-
-  public MockHTable(String tableName, String... columnFamilies) {
-    this(tableName);
-    for(String cf : columnFamilies) {
-      addColumnFamily(cf);
-    }
-  }
-
-  public int size() {
-    return data.size();
-  }
-
-  public void addColumnFamily(String columnFamily) {
-    this.columnFamilies.add(columnFamily);
-    descriptors = new HColumnDescriptor[columnFamilies.size()];
-    int i = 0;
-    for(String cf : columnFamilies) {
-      descriptors[i++] = new HColumnDescriptor(cf);
-    }
-  }
-
-  @Override
-  public byte[] getTableName() {
-    return Bytes.toBytes(tableName);
-  }
-
-  @Override
-  public TableName getName() {
-    return TableName.valueOf(tableName);
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return HBaseConfiguration.create();
-  }
-
-  @Override
-  public HTableDescriptor getTableDescriptor() throws IOException {
-    HTableDescriptor ret = new HTableDescriptor(tableName);
-    for(HColumnDescriptor c : descriptors) {
-      ret.addFamily(c);
-    }
-    return ret;
-  }
-
-  @Override
-  public boolean exists(Get get) throws IOException {
-    if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
-      return data.containsKey(get.getRow());
-    } else {
-      byte[] row = get.getRow();
-      if(!data.containsKey(row)) {
-        return false;
-      }
-      for(byte[] family : get.getFamilyMap().keySet()) {
-        if(!data.get(row).containsKey(family)) {
-          return false;
-        } else {
-          return true;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * Test for the existence of columns in the table, as specified by the Gets.
-   *
-   * <p>This will return an array of booleans. Each value will be true if the related Get matches
-   * one or more keys, false if not.
-   *
-   * <p>This is a server-side call so it prevents any data from being transferred to
-   * the client.
-   *
-   * @param gets the Gets
-   * @return Array of boolean.  True if the specified Get matches one or more keys, false if not.
-   * @throws IOException e
-   */
-  @Override
-  public boolean[] existsAll(List<Get> gets) throws IOException {
-    boolean[] ret = new boolean[gets.size()];
-    int i = 0;
-    for(boolean b : exists(gets)) {
-      ret[i++] = b;
-    }
-    return ret;
-  }
-
-  @Override
-  public Boolean[] exists(List<Get> list) throws IOException {
-    Boolean[] ret = new Boolean[list.size()];
-    int i = 0;
-    for(Get g : list) {
-      ret[i++] = exists(g);
-    }
-    return ret;
-  }
-
-  @Override
-  public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
-    Object[] results = batch(list);
-    System.arraycopy(results, 0, objects, 0, results.length);
-  }
-
-  /**
-   * @param actions
-   * @deprecated
-   */
-  @Deprecated
-  @Override
-  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
-    List<Result> results = new ArrayList<Result>();
-    for (Row r : actions) {
-      if (r instanceof Delete) {
-        delete((Delete) r);
-        continue;
-      }
-      if (r instanceof Put) {
-        put((Put) r);
-        continue;
-      }
-      if (r instanceof Get) {
-        results.add(get((Get) r));
-      }
-    }
-    return results.toArray();
-  }
-
-  @Override
-  public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
-    throw new UnsupportedOperationException();
-
-  }
-
-  /**
-   * @param list
-   * @param callback
-   * @deprecated
-   */
-  @Deprecated
-  @Override
-  public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Result get(Get get) throws IOException {
-    if (!data.containsKey(get.getRow()))
-      return new Result();
-    byte[] row = get.getRow();
-    List<KeyValue> kvs = new ArrayList<KeyValue>();
-    if (!get.hasFamilies()) {
-      kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
-    } else {
-      for (byte[] family : get.getFamilyMap().keySet()){
-        if (data.get(row).get(family) == null)
-          continue;
-        NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
-        if (qualifiers == null || qualifiers.isEmpty())
-          qualifiers = data.get(row).get(family).navigableKeySet();
-        for (byte[] qualifier : qualifiers){
-          if (qualifier == null)
-            qualifier = "".getBytes();
-          if (!data.get(row).containsKey(family) ||
-                  !data.get(row).get(family).containsKey(qualifier) ||
-                  data.get(row).get(family).get(qualifier).isEmpty())
-            continue;
-          Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
-          kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
-        }
-      }
-    }
-    Filter filter = get.getFilter();
-    if (filter != null) {
-      filter.reset();
-      List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-      for (KeyValue kv : kvs) {
-        if (filter.filterAllRemaining()) {
-          break;
-        }
-        if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-          continue;
-        }
-        if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
-          nkvs.add(kv);
-        }
-        // ignoring next key hint which is a optimization to reduce file system IO
-      }
-      if (filter.hasFilterRow()) {
-        filter.filterRow();
-      }
-      kvs = nkvs;
-    }
-
-    return new Result(kvs);
-  }
-
-  @Override
-  public Result[] get(List<Get> list) throws IOException {
-    Result[] ret = new Result[list.size()];
-    int i = 0;
-    for(Get g : list) {
-      ret[i++] = get(g);
-    }
-    return ret;
-  }
-
-  /**
-   * @param bytes
-   * @param bytes1
-   * @deprecated
-   */
-  @Deprecated
-  @Override
-  public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ResultScanner getScanner(Scan scan) throws IOException {
-    final List<Result> ret = new ArrayList<Result>();
-    byte[] st = scan.getStartRow();
-    byte[] sp = scan.getStopRow();
-    Filter filter = scan.getFilter();
-
-    for (byte[] row : data.keySet()){
-      // if row is equal to startRow emit it. When startRow (inclusive) and
-      // stopRow (exclusive) is the same, it should not be excluded which would
-      // happen w/o this control.
-      if (st != null && st.length > 0 &&
-              Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
-        // if row is before startRow do not emit, pass to next row
-        if (st != null && st.length > 0 &&
-                Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
-          continue;
-        // if row is equal to stopRow or after it do not emit, stop iteration
-        if (sp != null && sp.length > 0 &&
-                Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
-          break;
-      }
-
-      List<KeyValue> kvs = null;
-      if (!scan.hasFamilies()) {
-        kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
-      } else {
-        kvs = new ArrayList<KeyValue>();
-        for (byte[] family : scan.getFamilyMap().keySet()){
-          if (data.get(row).get(family) == null)
-            continue;
-          NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
-          if (qualifiers == null || qualifiers.isEmpty())
-            qualifiers = data.get(row).get(family).navigableKeySet();
-          for (byte[] qualifier : qualifiers){
-            if (data.get(row).get(family).get(qualifier) == null)
-              continue;
-            for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
-              if (timestamp < scan.getTimeRange().getMin())
-                continue;
-              if (timestamp > scan.getTimeRange().getMax())
-                continue;
-              byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
-              kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
-              if(kvs.size() == scan.getMaxVersions()) {
-                break;
-              }
-            }
-          }
-        }
-      }
-      if (filter != null) {
-        filter.reset();
-        List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-        for (KeyValue kv : kvs) {
-          if (filter.filterAllRemaining()) {
-            break;
-          }
-          if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-            continue;
-          }
-          Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
-          if (filterResult == Filter.ReturnCode.INCLUDE) {
-            nkvs.add(kv);
-          } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
-            break;
-          }
-          // ignoring next key hint which is a optimization to reduce file system IO
-        }
-        if (filter.hasFilterRow()) {
-          filter.filterRow();
-        }
-        kvs = nkvs;
-      }
-      if (!kvs.isEmpty()) {
-        ret.add(new Result(kvs));
-      }
-    }
-
-    return new ResultScanner() {
-      private final Iterator<Result> iterator = ret.iterator();
-      @Override
-      public Iterator<Result> iterator() {
-        return iterator;
-      }
-      @Override
-      public Result[] next(int nbRows) throws IOException {
-        ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-        for(int i = 0; i < nbRows; i++) {
-          Result next = next();
-          if (next != null) {
-            resultSets.add(next);
-          } else {
-            break;
-          }
-        }
-        return resultSets.toArray(new Result[resultSets.size()]);
-      }
-      @Override
-      public Result next() throws IOException {
-        try {
-          return iterator().next();
-        } catch (NoSuchElementException e) {
-          return null;
-        }
-      }
-      @Override
-      public void close() {}
-    };
-  }
-  @Override
-  public ResultScanner getScanner(byte[] family) throws IOException {
-    Scan scan = new Scan();
-    scan.addFamily(family);
-    return getScanner(scan);
-  }
-
-  @Override
-  public ResultScanner getScanner(byte[] family, byte[] qualifier)
-          throws IOException {
-    Scan scan = new Scan();
-    scan.addColumn(family, qualifier);
-    return getScanner(scan);
-  }
-
-  public List<Put> getPutLog() {
-    synchronized (putLog) {
-      return ImmutableList.copyOf(putLog);
-    }
-  }
-
-  public void addToPutLog(Put put) {
-    synchronized(putLog) {
-      putLog.add(put);
-    }
-  }
-
-  public void clear() {
-    synchronized (putLog) {
-      putLog.clear();
-    }
-    data.clear();
-  }
-
-  @Override
-  public void put(Put put) throws IOException {
-    addToPutLog(put);
-
-    byte[] row = put.getRow();
-    NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
-    for (byte[] family : put.getFamilyMap().keySet()){
-      NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
-      for (KeyValue kv : put.getFamilyMap().get(family)){
-        kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
-        byte[] qualifier = kv.getQualifier();
-        NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
-        qualifierData.put(kv.getTimestamp(), kv.getValue());
-      }
-    }
-  }
-
-  /**
-   * Helper method to find a key in a map. If key is not found, newObject is
-   * added to map and returned
-   *
-   * @param map
-   *          map to extract value from
-   * @param key
-   *          key to look for
-   * @param newObject
-   *          set key to this if not found
-   * @return found value or newObject if not found
-   */
-  private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
-    V data = map.get(key);
-    if (data == null){
-      data = newObject;
-      map.put(key, data);
-    }
-    return data;
-  }
-
-  @Override
-  public void put(List<Put> puts) throws IOException {
-    for (Put put : puts)
-      put(put);
-  }
-
-  @Override
-  public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected
-   * value. If it does, it adds the put.  If the passed value is null, the check
-   * is for the lack of column (ie: non-existance)
-   *
-   * @param row       to check
-   * @param family    column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value     the expected value
-   * @param put       data to put if check succeeds
-   * @return true if the new put was executed, false otherwise
-   * @throws IOException e
-   */
-  @Override
-  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void delete(Delete delete) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void delete(List<Delete> list) throws IOException {
-    throw new UnsupportedOperationException();
-
-  }
-
-  @Override
-  public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected
-   * value. If it does, it adds the delete.  If the passed value is null, the
-   * check is for the lack of column (ie: non-existance)
-   *
-   * @param row       to check
-   * @param family    column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value     the expected value
-   * @param delete    data to delete if check succeeds
-   * @return true if the new delete was executed, false otherwise
-   * @throws IOException e
-   */
-  @Override
-  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void mutateRow(RowMutations rowMutations) throws IOException {
-    throw new UnsupportedOperationException();
-
-  }
-
-  @Override
-  public Result append(Append append) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Result increment(Increment increment) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * @param bytes
-   * @param bytes1
-   * @param bytes2
-   * @param l
-   * @param b
-   * @deprecated
-   */
-  @Deprecated
-  @Override
-  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isAutoFlush() {
-    return autoflush;
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-
-  }
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
-    throw new UnsupportedOperationException();
-  }
-
-  boolean autoflush = true;
-
-  /**
-   * @param b
-   * @deprecated
-   */
-  @Deprecated
-  @Override
-  public void setAutoFlush(boolean b) {
-    autoflush = b;
-  }
-
-  @Override
-  public void setAutoFlush(boolean b, boolean b1) {
-    autoflush = b;
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean b) {
-    autoflush = b;
-  }
-
-  long writeBufferSize = 0;
-  @Override
-  public long getWriteBufferSize() {
-    return writeBufferSize;
-  }
-
-  @Override
-  public void setWriteBufferSize(long l) throws IOException {
-    writeBufferSize = l;
-  }
-
-  @Override
-  public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected value.
-   * If it does, it performs the row mutations.  If the passed value is null, the check
-   * is for the lack of column (ie: non-existence)
-   *
-   * @param row       to check
-   * @param family    column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp the comparison operator
-   * @param value     the expected value
-   * @param mutation  mutations to perform if check succeeds
-   * @return true if the new put was executed, false otherwise
-   * @throws IOException e
-   */
-  @Override
-  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 7c61620..93ced81 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -41,6 +41,7 @@
 		</license>
 	</licenses>
 	<modules>
+		<module>metron-hbase-client</module>
 		<module>metron-common</module>
 		<module>metron-enrichment</module>
 		<module>metron-solr</module>


Mime
View raw message