metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [3/3] metron git commit: METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824
Date Thu, 16 Nov 2017 01:35:48 GMT
METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd896fbe
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd896fbe
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd896fbe

Branch: refs/heads/master
Commit: fd896fbebe9d5e77eb11d1ce953ab2b55cc84387
Parents: c4c930f
Author: merrimanr <merrimanr@gmail.com>
Authored: Wed Nov 15 19:35:18 2017 -0600
Committer: merrimanr <merrimanr@apache.org>
Committed: Wed Nov 15 19:35:18 2017 -0600

----------------------------------------------------------------------
 metron-interface/metron-rest/README.md          |   36 +-
 .../apache/metron/rest/config/IndexConfig.java  |    3 +-
 .../rest/controller/MetaAlertController.java    |   48 +-
 .../metron/rest/service/MetaAlertService.java   |   10 +
 .../rest/service/impl/MetaAlertServiceImpl.java |   31 +
 .../rest/service/impl/SearchServiceImpl.java    |   18 +-
 .../MetaAlertControllerIntegrationTest.java     |  120 +-
 .../UpdateControllerIntegrationTest.java        |    5 +-
 .../org/apache/metron/common/utils/KeyUtil.java |   50 +
 .../hbase/HBaseEnrichmentConverterTest.java     |   21 +
 .../elasticsearch/dao/ElasticsearchDao.java     |  115 +-
 .../dao/ElasticsearchMetaAlertDao.java          |  717 +++++-----
 .../elasticsearch/dao/MetaAlertStatus.java      |   34 -
 .../dao/ElasticsearchMetaAlertDaoTest.java      |  304 +---
 .../ElasticsearchMetaAlertIntegrationTest.java  | 1301 ++++++++++--------
 .../ElasticsearchUpdateIntegrationTest.java     |    4 +-
 .../enrichment/converter/EnrichmentKey.java     |   23 +-
 metron-platform/metron-indexing/README.md       |   17 +-
 metron-platform/metron-indexing/pom.xml         |    7 +
 .../apache/metron/indexing/dao/HBaseDao.java    |  128 +-
 .../apache/metron/indexing/dao/IndexDao.java    |   38 +-
 .../metron/indexing/dao/MetaAlertDao.java       |   91 +-
 .../metron/indexing/dao/MultiIndexDao.java      |   54 +
 .../metaalert/MetaAlertAddRemoveRequest.java    |   41 +
 .../dao/metaalert/MetaAlertCreateRequest.java   |   14 +-
 .../indexing/dao/metaalert/MetaAlertStatus.java |   34 +
 .../metron/indexing/dao/search/GetRequest.java  |   35 +-
 .../apache/metron/indexing/dao/InMemoryDao.java |   25 +-
 .../indexing/dao/InMemoryMetaAlertDao.java      |   96 +-
 .../indexing/dao/SearchIntegrationTest.java     |   32 +-
 .../integration/HBaseDaoIntegrationTest.java    |  164 +++
 31 files changed, 2219 insertions(+), 1397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index b79b44d..724239b 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -218,6 +218,9 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
 | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)|
 | [ `GET /api/v1/metaalert/searchByAlert`](#get-apiv1metaalertsearchbyalert)|
 | [ `GET /api/v1/metaalert/create`](#get-apiv1metaalertcreate)|
+| [ `GET /api/v1/metaalert/add/alert`](#get-apiv1metaalertaddalert)|
+| [ `GET /api/v1/metaalert/remove/alert`](#get-apiv1metaalertremovealert)|
+| [ `GET /api/v1/metaalert/update/status/{guid}/{status}`](#get-apiv1metaalertupdatestatusguidstatus)|
 | [ `GET /api/v1/search/search`](#get-apiv1searchsearch)|
 | [ `POST /api/v1/search/search`](#get-apiv1searchsearch)|
 | [ `POST /api/v1/search/group`](#get-apiv1searchgroup)|
@@ -415,19 +418,40 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
     * 404 - Either Kafka topic is missing or contains no messages
 
 ### `POST /api/v1/metaalert/searchByAlert`
-  * Description: Searches meta alerts to find any containing an alert for the provided GUID
+  * Description: Get all meta alerts that contain an alert.
   * Input:
     * guid - GUID of the alert
   * Returns:
-    * 200 - Returns the meta alerts associated with this alert
-    * 404 - The child alert isn't found
+    * 200 - Search results
 
 ### `POST /api/v1/metaalert/create`
-  * Description: Creates a meta alert containing the provide alerts
+  * Description: Creates a new meta alert from a list of existing alerts.  The meta alert status will initially be set to 'ACTIVE' and summary statistics will be computed from the list of alerts.  A list of groups included in the request are also added to the meta alert.
   * Input:
-    * request - Meta Alert Create Request
+    * request - Meta alert create request which includes a list of alert get requests and a list of custom groups used to annotate a meta alert.
   * Returns:
-    * 200 - The meta alert was created
+    * 200 - The GUID of the new meta alert
+    
+### `POST /api/v1/metaalert/add/alert`
+  * Description: Adds an alert to an existing meta alert.  An alert will not be added if it is already contained in a meta alert.
+  * Input:
+    * request - Meta alert add request which includes a meta alert GUID and list of alert get requests
+  * Returns:
+    * 200 - Returns 'true' if the alert was added and 'false' if the meta alert did not change.
+        
+### `POST /api/v1/metaalert/remove/alert`
+  * Description: Removes an alert from an existing meta alert.  If the alert to be removed is not in a meta alert, 'false' will be returned.
+  * Input:
+    * request - Meta alert remove request which includes a meta alert GUID and list of alert get requests
+  * Returns:
+    * 200 - Returns 'true' if the alert was removed and 'false' if the meta alert did not change.
+            
+### `POST /api/v1/metaalert/update/status/{guid}/{status}`
+  * Description: Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.
+  * Input:
+    * guid - Meta alert GUID
+    * status - Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'
+  * Returns:
+    * 200 - Returns 'true' if the status changed and 'false' if it did not.
 
 ### `POST /api/v1/search/search`
   * Description: Searches the indexing store. GUIDs must be quoted to ensure correct results.

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 8eabb2e..4ce9644 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.config;
 
 import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
 
+import java.util.Optional;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -81,7 +82,7 @@ public class IndexConfig {
 
       // Create the meta alert dao and wrap it around the index dao.
       MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
-      ret.init(indexDao, metaDaoSort);
+      ret.init(indexDao, Optional.ofNullable(metaDaoSort));
       return ret;
     }
     catch(RuntimeException re) {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
index e9cff8b..d42403a 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
@@ -21,6 +21,8 @@ package org.apache.metron.rest.controller;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -29,6 +31,7 @@ import org.apache.metron.rest.service.MetaAlertService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
@@ -41,24 +44,59 @@ public class MetaAlertController {
   @Autowired
   private MetaAlertService metaAlertService;
 
-  @ApiOperation(value = "Get all meta alerts for alert")
+  @ApiOperation(value = "Get all meta alerts that contain an alert.")
   @ApiResponse(message = "Search results", code = 200)
   @RequestMapping(value = "/searchByAlert", method = RequestMethod.POST)
   ResponseEntity<SearchResponse> searchByAlert(
-      @ApiParam(name = "guid", value = "GUID", required = true)
+      @ApiParam(name = "guid", value = "Alert GUID", required = true)
       @RequestBody final String guid
   ) throws RestException {
     return new ResponseEntity<>(metaAlertService.getAllMetaAlertsForAlert(guid), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Create a meta alert")
-  @ApiResponse(message = "Created meta alert", code = 200)
+  @ApiOperation(value = "Creates a new meta alert from a list of existing alerts.  "
+      + "The meta alert status will initially be set to 'ACTIVE' and summary statistics "
+      + "will be computed from the list of alerts.  A list of groups included in the request are also added to the meta alert.")
+  @ApiResponse(message = "The GUID of the new meta alert", code = 200)
   @RequestMapping(value = "/create", method = RequestMethod.POST)
   ResponseEntity<MetaAlertCreateResponse> create(
-      @ApiParam(name = "request", value = "Meta Alert Create Request", required = true)
+      @ApiParam(name = "createRequest", value = "Meta alert create request which includes a list of alert "
+          + "get requests and a list of custom groups used to annotate a meta alert", required = true)
       @RequestBody  final MetaAlertCreateRequest createRequest
   ) throws RestException {
     return new ResponseEntity<>(metaAlertService.create(createRequest), HttpStatus.OK);
   }
+
+  @ApiOperation(value = "Adds an alert to an existing meta alert.  An alert will not be added if it is already contained in a meta alert.")
+  @ApiResponse(message = "Returns 'true' if the alert was added and 'false' if the meta alert did not change.", code = 200)
+  @RequestMapping(value = "/add/alert", method = RequestMethod.POST)
+  ResponseEntity<Boolean> addAlertsToMetaAlert(
+      @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert add request which includes a meta alert GUID and list of alert get requests", required = true)
+      @RequestBody  final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+  ) throws RestException {
+    return new ResponseEntity<>(metaAlertService.addAlertsToMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Removes an alert from an existing meta alert.  If the alert to be removed is not in a meta alert, 'false' will be returned.")
+  @ApiResponse(message = "Returns 'true' if the alert was removed and 'false' if the meta alert did not change.", code = 200)
+  @RequestMapping(value = "/remove/alert", method = RequestMethod.POST)
+  ResponseEntity<Boolean> removeAlertsFromMetaAlert(
+      @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert remove request which includes a meta alert GUID and list of alert get requests", required = true)
+      @RequestBody  final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+  ) throws RestException {
+    return new ResponseEntity<>(metaAlertService.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.")
+  @ApiResponse(message = "Returns 'true' if the status changed and 'false' if it did not.", code = 200)
+  @RequestMapping(value = "/update/status/{guid}/{status}", method = RequestMethod.POST)
+  ResponseEntity<Boolean> updateMetaAlertStatus(
+      final @ApiParam(name = "guid", value = "Meta alert GUID", required = true)
+      @PathVariable String guid,
+      final @ApiParam(name = "status", value = "Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'", required = true)
+      @PathVariable String status) throws RestException {
+    return new ResponseEntity<>(metaAlertService.updateMetaAlertStatus(guid,
+        MetaAlertStatus.valueOf(status.toUpperCase())), HttpStatus.OK);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
index c339506..e8abaf3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
@@ -18,8 +18,12 @@
 
 package org.apache.metron.rest.service;
 
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.rest.RestException;
 
@@ -28,4 +32,10 @@ public interface MetaAlertService {
   MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException;
 
   SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException;
+
+  boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+  boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+  boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws RestException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index f120c9e..aafab24 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -19,10 +19,13 @@
 package org.apache.metron.rest.service.impl;
 
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
@@ -63,4 +66,32 @@ public class MetaAlertServiceImpl implements MetaAlertService {
       throw new RestException(ise.getMessage(), ise);
     }
   }
+
+  @Override
+  public boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+    try {
+      return dao.addAlertsToMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+    try {
+      return dao.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
+
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+      throws RestException {
+    try {
+      return dao.updateMetaAlertStatus(metaAlertGuid, status);
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
index efd80a7..433eae3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
@@ -58,13 +58,10 @@ public class SearchServiceImpl implements SearchService {
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws RestException {
     try {
-      // Pull the indices from the cache by default
       if (searchRequest.getIndices() == null || searchRequest.getIndices().isEmpty()) {
-        List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
-        // metaalerts should be included by default
+        List<String> indices = getDefaultIndices();
+        // metaalerts should be included by default in search requests
         indices.add(METAALERT_TYPE);
-        // errors should not be included by default
-        indices.remove(ERROR_TYPE);
         searchRequest.setIndices(indices);
       }
       return dao.search(searchRequest);
@@ -77,6 +74,9 @@ public class SearchServiceImpl implements SearchService {
   @Override
   public GroupResponse group(GroupRequest groupRequest) throws RestException {
     try {
+      if (groupRequest.getIndices() == null || groupRequest.getIndices().isEmpty()) {
+        groupRequest.setIndices(getDefaultIndices());
+      }
       return dao.group(groupRequest);
     }
     catch(InvalidSearchException ise) {
@@ -112,4 +112,12 @@ public class SearchServiceImpl implements SearchService {
       throw new RestException(ioe.getMessage(), ioe);
     }
   }
+
+  private List<String> getDefaultIndices() throws RestException {
+    // Pull the indices from the cache by default
+    List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
+    // errors should not be included by default
+    indices.remove(ERROR_TYPE);
+    return indices;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
index 983c207..b0dd774 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
@@ -28,11 +28,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.InMemoryMetaAlertDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.rest.service.MetaAlertService;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,10 +78,18 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
 
   /**
    {
-   "guidToIndices" : {
-   "bro_1":"bro_index_2017.01.01.01",
-   "snort_2":"snort_index_2017.01.01.01"
+   "alerts" : [
+   {
+   "guid": "bro_1",
+   "sensorType": "bro",
+   "index": "bro_index_2017.01.01.01"
    },
+   {
+   "guid": "snort_2",
+   "sensorType": "snort",
+   "index": "snort_index_2017.01.01.01"
+   }
+   ],
    "groups" : ["group_one", "group_two"]
    }
    */
@@ -88,6 +107,11 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
     loadTestData(testData);
   }
 
+  @After
+  public void cleanup() {
+    InMemoryMetaAlertDao.clear();
+  }
+
   @Test
   public void test() throws Exception {
     // Testing searching by alert
@@ -171,4 +195,94 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
         .andExpect(jsonPath("$.results[0].source.guid").value("meta_3"))
         .andExpect(jsonPath("$.results[0].source.count").value(2.0));
   }
+
+  @Test
+  public void shouldAddRemoveAlerts() throws Exception {
+    MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+    metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+    metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+    }});
+    MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+    MetaAlertAddRemoveRequest addRequest = new MetaAlertAddRemoveRequest();
+    addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    addRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_2", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("bro_3", "bro", "bro_index_2017.01.01.01"));
+    }});
+
+    ResultActions result = this.mockMvc.perform(
+        post(metaalertUrl + "/add/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(addRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    MetaAlertAddRemoveRequest addDuplicateRequest = new MetaAlertAddRemoveRequest();
+    addDuplicateRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    addDuplicateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/add/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(addDuplicateRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+
+    MetaAlertAddRemoveRequest removeRequest = new MetaAlertAddRemoveRequest();
+    removeRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    removeRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_2", "bro"));
+      add(new GetRequest("bro_3", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/remove/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(removeRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    MetaAlertAddRemoveRequest removeMissingRequest = new MetaAlertAddRemoveRequest();
+    addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    removeMissingRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/remove/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(removeMissingRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+  }
+
+  @Test
+  public void shouldUpdateStatus() throws Exception {
+    MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+    metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+    metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+    }});
+
+    MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+    ResultActions result = this.mockMvc.perform(
+        post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index 4708bc4..57a1b28 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.HBaseDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.rest.service.UpdateService;
@@ -161,7 +162,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
     Assert.assertEquals(1,table.size());
     {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(new HBaseDao.Key(guid,"bro").toBytes());
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(1, columns.size());
@@ -183,7 +184,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
     Assert.assertEquals(1,table.size());
     {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(new HBaseDao.Key(guid, "bro").toBytes());
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(2, columns.size());

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
new file mode 100644
index 0000000..595a839
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public enum KeyUtil {
+  INSTANCE;
+  private static final int SEED = 0xDEADBEEF;
+  public static final int HASH_PREFIX_SIZE=16;
+  ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+    @Override
+    protected HashFunction initialValue() {
+      return Hashing.murmur3_128(SEED);
+    }
+  };
+
+  public byte[] getPrefix(byte[] key) {
+    Hasher hasher = hFunction.get().newHasher();
+    hasher.putBytes(key);
+    return hasher.hash().asBytes();
+  }
+
+  public byte[] merge(byte[] prefix, byte[] key) {
+    byte[] val = new byte[key.length + prefix.length];
+    int offset = 0;
+    System.arraycopy(prefix, 0, val, offset, prefix.length);
+    offset += prefix.length;
+    System.arraycopy(key, 0, val, offset, key.length);
+    return val;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
index a018e27..fff1d9b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -34,6 +34,15 @@ import java.util.HashMap;
 
 
 public class HBaseEnrichmentConverterTest {
+    public static byte[] keyBytes = new byte[] {
+            0x31,(byte)0xc2,0x49,0x05,0x6b,(byte)0xea,
+            0x0e,0x59,(byte)0xe1,(byte)0xad,(byte)0xa0,0x24,
+            0x55,(byte)0xa9,0x6b,0x63,0x00,0x06,
+            0x64,0x6f,0x6d,0x61,0x69,0x6e,
+            0x00,0x06,0x67,0x6f,0x6f,0x67,
+            0x6c,0x65
+    };
+
     EnrichmentKey key = new EnrichmentKey("domain", "google");
     EnrichmentValue value = new EnrichmentValue(
             new HashMap<String, Object>() {{
@@ -41,6 +50,18 @@ public class HBaseEnrichmentConverterTest {
                 put("grok", "baz");
             }});
     LookupKV<EnrichmentKey, EnrichmentValue> results = new LookupKV(key, value);
+
+    /**
+     * IF this test fails then you have broken the key serialization in that your change has
+     * caused a key to change serialization, so keys from previous releases will not be able to be found
+     * under your scheme.  Please either provide a migration plan or undo this change.  DO NOT CHANGE THIS
+     * TEST BLITHELY!
+     */
+    @Test
+    public void testKeySerializationRemainsConstant() {
+        byte[] raw = key.toBytes();
+        Assert.assertArrayEquals(raw, keyBytes);
+    }
     @Test
     public void testKeySerialization() {
         byte[] serialized = key.toBytes();

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index f114b4c..61d5472 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -25,9 +25,11 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +40,7 @@ import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.Group;
 import org.apache.metron.indexing.dao.search.GroupOrder;
 import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -55,10 +58,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -256,40 +258,73 @@ public class ElasticsearchDao implements IndexDao {
     return ret.orElse(null);
   }
 
+  @Override
+  public Iterable<Document> getAllLatest(
+      final List<GetRequest> getRequests) throws IOException {
+    Collection<String> guids = new HashSet<>();
+    Collection<String> sensorTypes = new HashSet<>();
+    for (GetRequest getRequest: getRequests) {
+      guids.add(getRequest.getGuid());
+      sensorTypes.add(getRequest.getSensorType());
+    }
+    List<Document> documents = searchByGuids(
+        guids
+        , sensorTypes
+        , hit -> {
+          Long ts = 0L;
+          String doc = hit.getSourceAsString();
+          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+          try {
+            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+          }
+        }
+
+    );
+    return documents;
+  }
+
+  <T> Optional<T> searchByGuid(String guid, String sensorType,
+      Function<SearchHit, Optional<T>> callback) {
+    Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
+    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
+    if (results.size() > 0) {
+      return Optional.of(results.get(0));
+    } else {
+      return Optional.empty();
+    }
+  }
+
   /**
    * Return the search hit based on the UUID and sensor type.
    * A callback can be specified to transform the hit into a type T.
    * If more than one hit happens, the first one will be returned.
    */
-  <T> Optional<T> searchByGuid(String guid, String sensorType,
+  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
       Function<SearchHit, Optional<T>> callback) {
     QueryBuilder query;
-    if (sensorType != null) {
-      query = QueryBuilders.idsQuery(sensorType + "_doc").ids(guid);
+    if (sensorTypes != null) {
+      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
+      query = QueryBuilders.idsQuery(types).ids(guids);
     } else {
-      query = QueryBuilders.idsQuery().ids(guid);
+      query = QueryBuilders.idsQuery().ids(guids);
     }
     SearchRequestBuilder request = client.prepareSearch()
                                          .setQuery(query)
                                          .setSource("message")
+                                         .setSize(guids.size())
                                          ;
     org.elasticsearch.action.search.SearchResponse response = request.get();
     SearchHits hits = response.getHits();
-    long totalHits = hits.getTotalHits();
-    if (totalHits > 1) {
-      LOG.warn("Encountered {} results for guid {} in sensor {}. Returning first hit.",
-          totalHits,
-          guid,
-          sensorType
-      );
-    }
+    List<T> results = new ArrayList<>();
     for (SearchHit hit : hits) {
-      Optional<T> ret = callback.apply(hit);
-      if (ret.isPresent()) {
-        return ret;
+      Optional<T> result = callback.apply(hit);
+      if (result.isPresent()) {
+        results.add(result.get());
       }
     }
-    return Optional.empty();
+    return results;
   }
 
   @Override
@@ -297,18 +332,17 @@ public class ElasticsearchDao implements IndexDao {
     String indexPostfix = ElasticsearchUtils
         .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
     String sensorType = update.getSensorType();
-    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
-    String existingIndex = calculateExistingIndex(update, index, indexPostfix);
+    String indexName = getIndexName(update, index, indexPostfix);
 
-    UpdateRequest updateRequest = buildUpdateRequest(update, sensorType, indexName, existingIndex);
+    IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
     try {
-      UpdateResponse response = client.update(updateRequest).get();
+      IndexResponse response = client.index(indexRequest).get();
 
       ShardInfo shardInfo = response.getShardInfo();
       int failed = shardInfo.getFailed();
       if (failed > 0) {
         throw new IOException(
-            "ElasticsearchDao upsert failed: " + Arrays.toString(shardInfo.getFailures()));
+            "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
       }
     } catch (Exception e) {
       throw new IOException(e.getMessage(), e);
@@ -326,16 +360,14 @@ public class ElasticsearchDao implements IndexDao {
     for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
       Document update = updateEntry.getKey();
       String sensorType = update.getSensorType();
-      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
-      String existingIndex = calculateExistingIndex(update, updateEntry.getValue(), indexPostfix);
-      UpdateRequest updateRequest = buildUpdateRequest(
+      String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
+      IndexRequest indexRequest = buildIndexRequest(
           update,
           sensorType,
-          indexName,
-          existingIndex
+          indexName
       );
 
-      bulkRequestBuilder.add(updateRequest);
+      bulkRequestBuilder.add(indexRequest);
     }
 
     BulkResponse bulkResponse = bulkRequestBuilder.get();
@@ -346,21 +378,20 @@ public class ElasticsearchDao implements IndexDao {
     }
   }
 
-  protected String calculateExistingIndex(Document update, Optional<String> index,
-      String indexPostFix) {
-    String sensorType = update.getSensorType();
-    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostFix, null);
+  protected String getIndexName(Document update, Optional<String> index, String indexPostFix) {
+      return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
+                  .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
+      );
+  }
 
-    return index.orElse(
-        searchByGuid(update.getGuid(),
-            sensorType,
-            hit -> Optional.ofNullable(hit.getIndex())
-        ).orElse(indexName)
+  protected Optional<String> getIndexName(String guid, String sensorType) {
+    return searchByGuid(guid,
+        sensorType,
+        hit -> Optional.ofNullable(hit.getIndex())
     );
   }
 
-  protected UpdateRequest buildUpdateRequest(Document update, String sensorType, String indexName,
-      String existingIndex) {
+  protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) {
     String type = sensorType + "_doc";
     Object ts = update.getTimestamp();
     IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
@@ -370,9 +401,7 @@ public class ElasticsearchDao implements IndexDao {
       indexRequest = indexRequest.timestamp(ts.toString());
     }
 
-    return new UpdateRequest(existingIndex, type, update.getGuid())
-        .doc(update.getDocument())
-        .upsert(indexRequest);
+    return indexRequest;
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index eef134f..c24ba0c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,18 +18,21 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.apache.metron.common.Constants.GUID;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
 import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
 import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -37,7 +40,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -45,8 +47,10 @@ import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.MultiIndexDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.metaalert.MetaScores;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
@@ -55,31 +59,26 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.index.query.support.QueryInnerHitBuilder;
-import org.elasticsearch.search.SearchHit;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
-  private static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+  public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+  private static final String STATUS_PATH = "/status";
+  private static final String ALERT_PATH = "/alert";
+
   private IndexDao indexDao;
   private ElasticsearchDao elasticsearchDao;
   private String index = METAALERTS_INDEX;
   private String threatTriageField = THREAT_FIELD_DEFAULT;
   private String threatSort = THREAT_SORT_DEFAULT;
+  private int pageSize = 500;
 
   /**
    * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
@@ -96,7 +95,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
    */
   public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField,
       String threatSort) {
-    init(indexDao, threatSort);
+    init(indexDao, Optional.of(threatSort));
     this.index = index;
     this.threatTriageField = triageLevelField;
   }
@@ -105,8 +104,14 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     //uninitialized.
   }
 
+  /**
+   * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
+   * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
+   * @param indexDao The DAO to wrap for our queries
+   * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc.
+   */
   @Override
-  public void init(IndexDao indexDao, String threatSort) {
+  public void init(IndexDao indexDao, Optional<String> threatSort) {
     if (indexDao instanceof MultiIndexDao) {
       this.indexDao = indexDao;
       MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao;
@@ -124,8 +129,8 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
       );
     }
 
-    if (threatSort != null) {
-      this.threatSort = threatSort;
+    if (threatSort.isPresent()) {
+      this.threatSort = threatSort.get();
     }
   }
 
@@ -139,66 +144,63 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     if (guid == null || guid.trim().isEmpty()) {
       throw new InvalidSearchException("Guid cannot be empty");
     }
-    org.elasticsearch.action.search.SearchResponse esResponse = getMetaAlertsForAlert(guid.trim());
-    SearchResponse searchResponse = new SearchResponse();
-    searchResponse.setTotal(esResponse.getHits().getTotalHits());
-    searchResponse.setResults(
-        Arrays.stream(esResponse.getHits().getHits()).map(searchHit -> {
-              SearchResult searchResult = new SearchResult();
-              searchResult.setId(searchHit.getId());
-              searchResult.setSource(searchHit.getSource());
-              searchResult.setScore(searchHit.getScore());
-              searchResult.setIndex(searchHit.getIndex());
-              return searchResult;
-            }
-        ).collect(Collectors.toList()));
-    return searchResponse;
+    // Searches for all alerts containing the meta alert guid in it's "metalerts" array
+    QueryBuilder qb = boolQuery()
+        .must(
+            nestedQuery(
+                ALERT_FIELD,
+                boolQuery()
+                    .must(termQuery(ALERT_FIELD + "." + GUID, guid))
+            ).innerHit(new QueryInnerHitBuilder())
+        )
+        .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+    return queryAllResults(qb);
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
       throws InvalidCreateException, IOException {
-    if (request.getGuidToIndices().isEmpty()) {
-      throw new InvalidCreateException("MetaAlertCreateRequest must contain alert GUIDs");
+    List<GetRequest> alertRequests = request.getAlerts();
+    if (request.getAlerts().isEmpty()) {
+      throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
     }
     if (request.getGroups().isEmpty()) {
       throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
     }
 
     // Retrieve the documents going into the meta alert and build it
-    MultiGetResponse multiGetResponse = getDocumentsByGuid(request);
-    Document createDoc = buildCreateDocument(multiGetResponse, request.getGroups());
-    MetaScores metaScores = calculateMetaScores(createDoc);
-    createDoc.getDocument().putAll(metaScores.getMetaScores());
-    createDoc.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
+    Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+
+    Document metaAlert = buildCreateDocument(alerts, request.getGroups());
+    calculateMetaScores(metaAlert);
     // Add source type to be consistent with other sources and allow filtering
-    createDoc.getDocument().put("source:type", MetaAlertDao.METAALERT_TYPE);
+    metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
 
     // Start a list of updates / inserts we need to run
     Map<Document, Optional<String>> updates = new HashMap<>();
-    updates.put(createDoc, Optional.of(MetaAlertDao.METAALERTS_INDEX));
+    updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
 
     try {
       // We need to update the associated alerts with the new meta alerts, making sure existing
       // links are maintained.
-      List<String> metaAlertField;
-      for (MultiGetItemResponse itemResponse : multiGetResponse) {
-        metaAlertField = new ArrayList<>();
-        GetResponse response = itemResponse.getResponse();
-        if (response.isExists()) {
-          List<String> alertField = (List<String>) response.getSourceAsMap()
-              .get(MetaAlertDao.METAALERT_FIELD);
-          if (alertField != null) {
-            metaAlertField.addAll(alertField);
+      Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getIndex));
+      Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getSensorType));
+      for (Document alert: alerts) {
+        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+          // Use the index in the request if it exists
+          Optional<String> index = guidToIndices.get(alert.getGuid());
+          if (!index.isPresent()) {
+            // Look up the index from Elasticsearch if one is not supplied in the request
+            index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
+            if (!index.isPresent()) {
+              throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
+            }
           }
+          updates.put(alert, index);
         }
-        metaAlertField.add(createDoc.getGuid());
-
-        Document alertUpdate = buildAlertUpdate(response.getId(),
-            (String) response.getSource().get(SOURCE_TYPE), metaAlertField,
-            (Long) response.getSourceAsMap().get("_timestamp"));
-        updates.put(alertUpdate, Optional.of(itemResponse.getIndex()));
       }
 
       // Kick off any updates.
@@ -206,7 +208,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
       MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
       createResponse.setCreated(true);
-      createResponse.setGuid(createDoc.getGuid());
+      createResponse.setGuid(metaAlert.getGuid());
       return createResponse;
     } catch (IOException ioe) {
       throw new InvalidCreateException("Unable to create meta alert", ioe);
@@ -214,6 +216,149 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+      boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
+      if (metaAlertUpdated) {
+        calculateMetaScores(metaAlert);
+        updates.put(metaAlert, Optional.of(index));
+        for(Document alert: alerts) {
+          if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+            updates.put(alert, Optional.empty());
+          }
+        }
+        indexDaoUpdate(updates);
+      }
+      return metaAlertUpdated;
+    } else {
+      throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
+    }
+  }
+
+  protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
+    boolean alertAdded = false;
+    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+    Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
+        (String) currentAlert.get(GUID)).collect(Collectors.toSet());
+    for (Document alert: alerts) {
+      String alertGuid = alert.getGuid();
+      // Only add an alert if it isn't already in the meta alert
+      if (!currentAlertGuids.contains(alertGuid)) {
+        currentAlerts.add(alert.getDocument());
+        alertAdded = true;
+      }
+    }
+    return alertAdded;
+  }
+
+  protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
+    List<String> metaAlertField = new ArrayList<>();
+    List<String> alertField = (List<String>) alert.getDocument()
+        .get(MetaAlertDao.METAALERT_FIELD);
+    if (alertField != null) {
+      metaAlertField.addAll(alertField);
+    }
+    boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
+    if (metaAlertAdded) {
+      metaAlertField.add(metaAlertGuid);
+      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    }
+    return metaAlertAdded;
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+      Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
+          Collectors.toList());
+      boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
+      if (metaAlertUpdated) {
+        calculateMetaScores(metaAlert);
+        updates.put(metaAlert, Optional.of(index));
+        for(Document alert: alerts) {
+          if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
+            updates.put(alert, Optional.empty());
+          }
+        }
+        indexDaoUpdate(updates);
+      }
+      return metaAlertUpdated;
+    } else {
+      throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
+    }
+
+  }
+
+  protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
+    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+    int previousSize = currentAlerts.size();
+    // Only remove an alert if it is in the meta alert
+    currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
+    return currentAlerts.size() != previousSize;
+  }
+
+  protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
+    List<String> metaAlertField = new ArrayList<>();
+    List<String> alertField = (List<String>) alert.getDocument()
+        .get(MetaAlertDao.METAALERT_FIELD);
+    if (alertField != null) {
+      metaAlertField.addAll(alertField);
+    }
+    boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
+    if (metaAlertRemoved) {
+      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    }
+    return metaAlertRemoved;
+  }
+
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
+    boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
+    if (metaAlertUpdated) {
+      metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
+      updates.put(metaAlert, Optional.of(index));
+      List<GetRequest> getRequests = new ArrayList<>();
+      List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
+          .get(MetaAlertDao.ALERT_FIELD);
+      currentAlerts.stream().forEach(currentAlert -> {
+        getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
+      });
+      Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
+      for (Document alert : alerts) {
+        boolean metaAlertAdded = false;
+        boolean metaAlertRemoved = false;
+        // If we're making it active add add the meta alert guid for every alert.
+        if (MetaAlertStatus.ACTIVE.equals(status)) {
+          metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
+        }
+        // If we're making it inactive, remove the meta alert guid from every alert.
+        if (MetaAlertStatus.INACTIVE.equals(status)) {
+          metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
+        }
+        if (metaAlertAdded || metaAlertRemoved) {
+          updates.put(alert, Optional.empty());
+        }
+      }
+    }
+    if (metaAlertUpdated) {
+      indexDaoUpdate(updates);
+    }
+    return metaAlertUpdated;
+  }
+
+  @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
     // Wrap the query to also get any meta-alerts.
     QueryBuilder qb = constantScoreQuery(boolQuery()
@@ -242,95 +387,171 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    return indexDao.getAllLatest(getRequests);
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws IOException {
     if (METAALERT_TYPE.equals(update.getSensorType())) {
       // We've been passed an update to the meta alert.
-      handleMetaUpdate(update);
+      throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
     } else {
+      Map<Document, Optional<String>> updates = new HashMap<>();
+      updates.put(update, index);
       // We need to update an alert itself.  Only that portion of the update can be delegated.
       // We still need to get meta alerts potentially associated with it and update.
-      org.elasticsearch.action.search.SearchResponse response = getMetaAlertsForAlert(
-          update.getGuid()
-      );
-
-      // Each hit, if any, is a metaalert that needs to be updated
-      for (SearchHit hit : response.getHits()) {
-        handleAlertUpdate(update, hit);
+      Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
+          .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
+          .collect(Collectors.toList());
+      // Each meta alert needs to be updated with the new alert
+      for (Document metaAlert : metaAlerts) {
+        replaceAlertInMetaAlert(metaAlert, update);
+        updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
       }
 
       // Run the alert's update
-      indexDao.update(update, index);
+      indexDao.batchUpdate(updates);
     }
   }
 
+  protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
+    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
+    if (metaAlertUpdated) {
+      addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
+    }
+    return metaAlertUpdated;
+  }
+
   @Override
   public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
     throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
   }
 
   /**
+   * Does not allow patches on the "alerts" or "status" fields.  These fields must be updated with their
+   * dedicated methods.
+   *
+   * @param request The patch request
+   * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
+   * @throws OriginalNotFoundException
+   * @throws IOException
+   */
+  @Override
+  public void patch(PatchRequest request, Optional<Long> timestamp)
+      throws OriginalNotFoundException, IOException {
+    if (isPatchAllowed(request)) {
+      Document d = getPatchedDocument(request, timestamp);
+      indexDao.update(d, Optional.ofNullable(request.getIndex()));
+    } else {
+      throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths.  "
+          + "Please use the add/remove alert or update status functions instead.");
+    }
+  }
+
+  protected boolean isPatchAllowed(PatchRequest request) {
+    Iterator patchIterator = request.getPatch().iterator();
+    while(patchIterator.hasNext()) {
+      JsonNode patch = (JsonNode) patchIterator.next();
+      String path = patch.path("path").asText();
+      if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Given an alert GUID, retrieve all associated meta alerts.
-   * @param guid The GUID of the child alert
+   * @param alertGuid The GUID of the child alert
    * @return The Elasticsearch response containing the meta alerts
    */
-  protected org.elasticsearch.action.search.SearchResponse getMetaAlertsForAlert(String guid) {
+  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
     QueryBuilder qb = boolQuery()
         .must(
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + Constants.GUID, guid))
+                    .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid))
             ).innerHit(new QueryInnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
-    SearchRequest sr = new SearchRequest();
-    ArrayList<String> indices = new ArrayList<>();
-    indices.add(index);
-    sr.setIndices(indices);
-    return elasticsearchDao
+    return queryAllResults(qb);
+  }
+
+  /**
+   * Elasticsearch queries default to 10 records returned.  Some internal queries require that all
+   * results are returned.  Rather than setting an arbitrarily high size, this method pages through results
+   * and returns them all in a single SearchResponse.
+   * @param qb
+   * @return
+   */
+  protected SearchResponse queryAllResults(QueryBuilder qb) {
+    SearchRequestBuilder searchRequestBuilder = elasticsearchDao
         .getClient()
         .prepareSearch(index)
         .addFields("*")
         .setFetchSource(true)
         .setQuery(qb)
+        .setSize(pageSize);
+    org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
         .execute()
         .actionGet();
+    List<SearchResult> allResults = getSearchResults(esResponse);
+    long total = esResponse.getHits().getTotalHits();
+    if (total > pageSize) {
+      int pages = (int) (total / pageSize) + 1;
+      for (int i = 1; i < pages; i++) {
+        int from = i * pageSize;
+        searchRequestBuilder.setFrom(from);
+        esResponse = searchRequestBuilder
+            .execute()
+            .actionGet();
+        allResults.addAll(getSearchResults(esResponse));
+      }
+    }
+    SearchResponse searchResponse = new SearchResponse();
+    searchResponse.setTotal(total);
+    searchResponse.setResults(allResults);
+    return searchResponse;
   }
 
   /**
-   * Return child documents after retrieving them from Elasticsearch.
-   * @param request The request detailing which child alerts we need
-   * @return The Elasticsearch response to our request for alerts
+   * Transforms a list of Elasticsearch SearchHits to a list of SearchResults
+   * @param searchResponse
+   * @return
    */
-  protected MultiGetResponse getDocumentsByGuid(MetaAlertCreateRequest request) {
-    MultiGetRequestBuilder multiGet = elasticsearchDao.getClient().prepareMultiGet();
-    for (Entry<String, String> entry : request.getGuidToIndices().entrySet()) {
-      multiGet.add(new Item(entry.getValue(), null, entry.getKey()));
-    }
-    return multiGet.get();
+  protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
+    return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
+          SearchResult searchResult = new SearchResult();
+          searchResult.setId(searchHit.getId());
+          searchResult.setSource(searchHit.getSource());
+          searchResult.setScore(searchHit.getScore());
+          searchResult.setIndex(searchHit.getIndex());
+          return searchResult;
+        }
+    ).collect(Collectors.toList());
   }
 
   /**
    * Build the Document representing a meta alert to be created.
-   * @param multiGetResponse The Elasticsearch results for the meta alerts child documents
+   * @param alerts The Elasticsearch results for the meta alerts child documents
    * @param groups The groups used to create this meta alert
    * @return A Document representing the new meta alert
    */
-  protected Document buildCreateDocument(MultiGetResponse multiGetResponse, List<String> groups) {
+  protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
     // Need to create a Document from the multiget. Scores will be calculated later
     Map<String, Object> metaSource = new HashMap<>();
     List<Map<String, Object>> alertList = new ArrayList<>();
-    for (MultiGetItemResponse itemResponse : multiGetResponse) {
-      GetResponse response = itemResponse.getResponse();
-      if (response.isExists()) {
-        alertList.add(response.getSource());
-      }
+    for (Document alert: alerts) {
+      alertList.add(alert.getDocument());
     }
     metaSource.put(ALERT_FIELD, alertList);
 
     // Add any meta fields
     String guid = UUID.randomUUID().toString();
-    metaSource.put(Constants.GUID, guid);
+    metaSource.put(GUID, guid);
     metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
     metaSource.put(GROUPS_FIELD, groups);
     metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
@@ -339,29 +560,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   /**
-   * Process an update to a meta alert itself.
-   * @param update The update Document to be applied
-   * @throws IOException If there's a problem running the update
-   */
-  protected void handleMetaUpdate(Document update) throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-
-    if (update.getDocument().containsKey(MetaAlertDao.STATUS_FIELD)) {
-      // Update all associated alerts to maintain the meta alert link properly
-      updates.putAll(buildStatusAlertUpdates(update));
-    }
-    if (update.getDocument().containsKey(MetaAlertDao.ALERT_FIELD)) {
-      // If the alerts field changes (i.e. add/remove alert), update all affected alerts to
-      // maintain the meta alert link properly.
-      updates.putAll(buildAlertFieldUpdates(update));
-    }
-
-    // Run meta alert update.
-    updates.put(update, Optional.of(index));
-    indexDaoUpdate(updates);
-  }
-
-  /**
    * Calls the single update variant if there's only one update, otherwise calls batch.
    * @param updates The list of updates to run
    * @throws IOException If there's an update error
@@ -375,203 +573,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     } // else we have no updates, so don't do anything
   }
 
-  protected Map<Document, Optional<String>> buildStatusAlertUpdates(Document update)
-      throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
-    for (Map<String, Object> alert : alerts) {
-      // Retrieve the associated alert, so we can update the array
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      String status = (String) update.getDocument().get(MetaAlertDao.STATUS_FIELD);
-
-      Document alertUpdate = null;
-      String alertGuid = (String) alert.get(Constants.GUID);
-      // If we're making it active add add the meta alert guid for every alert.
-      if (MetaAlertStatus.ACTIVE.getStatusString().equals(status)
-          && !metaAlertField.contains(update.getGuid())) {
-        metaAlertField.add(update.getGuid());
-        alertUpdate = buildAlertUpdate(
-            alertGuid,
-            (String) alert.get(SOURCE_TYPE),
-            metaAlertField,
-            (Long) alert.get("_timestamp")
-        );
-      }
-
-      // If we're making it inactive, remove the meta alert guid from every alert.
-      if (MetaAlertStatus.INACTIVE.getStatusString().equals(status)
-          && metaAlertField.remove(update.getGuid())) {
-        alertUpdate = buildAlertUpdate(
-            alertGuid,
-            (String) alert.get(SOURCE_TYPE),
-            metaAlertField,
-            (Long) alert.get("_timestamp")
-        );
-      }
-
-      // Only run an alert update if we have an actual update.
-      if (alertUpdate != null) {
-        updates.put(alertUpdate, Optional.empty());
-      }
-    }
-    return updates;
-  }
-
-  protected Map<Document, Optional<String>> buildAlertFieldUpdates(Document update)
-      throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    // If we've updated the alerts field (i.e add/remove), recalculate meta alert scores and
-    // the metaalerts fields for updating the children alerts.
-    MetaScores metaScores = calculateMetaScores(update);
-    update.getDocument().putAll(metaScores.getMetaScores());
-    update.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
-
-    // Get the set of GUIDs that are in the new version.
-    Set<String> updateGuids = new HashSet<>();
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> updateAlerts = (List<Map<String, Object>>) update.getDocument()
-        .get(MetaAlertDao.ALERT_FIELD);
-    for (Map<String, Object> alert : updateAlerts) {
-      updateGuids.add((String) alert.get(Constants.GUID));
-    }
-
-    // Get the set of GUIDs from the old version
-    List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
-    Set<String> currentGuids = new HashSet<>();
-    for (Map<String, Object> alert : alerts) {
-      currentGuids.add((String) alert.get(Constants.GUID));
-    }
-
-    // Get both set differences, so we know what's been added and removed.
-    Set<String> removedGuids = SetUtils.difference(currentGuids, updateGuids);
-    Set<String> addedGuids = SetUtils.difference(updateGuids, currentGuids);
-
-    Document alertUpdate;
-
-    // Handle any removed GUIDs
-    for (String guid : removedGuids) {
-      // Retrieve the associated alert, so we can update the array
-      Document alert = elasticsearchDao.getLatest(guid, null);
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.getDocument()
-          .get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      if (metaAlertField.remove(update.getGuid())) {
-        alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
-            alert.getTimestamp());
-        updates.put(alertUpdate, Optional.empty());
-      }
-    }
-
-    // Handle any added GUIDs
-    for (String guid : addedGuids) {
-      // Retrieve the associated alert, so we can update the array
-      Document alert = elasticsearchDao.getLatest(guid, null);
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.getDocument()
-          .get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      metaAlertField.add(update.getGuid());
-      alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
-          alert.getTimestamp());
-      updates.put(alertUpdate, Optional.empty());
-    }
-
-    return updates;
-  }
-
-  @SuppressWarnings("unchecked")
-  protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
-    Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
-    if (latest == null) {
-      return new ArrayList<>();
-    }
-    List<String> guids = new ArrayList<>();
-    List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
-        .get(MetaAlertDao.ALERT_FIELD);
-    for (Map<String, Object> alert : latestAlerts) {
-      guids.add((String) alert.get(Constants.GUID));
-    }
-
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    QueryBuilder query = QueryBuilders.idsQuery().ids(guids);
-    SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
-        .setQuery(query);
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    for (SearchHit hit : response.getHits().getHits()) {
-      alerts.add(hit.sourceAsMap());
-    }
-    return alerts;
-  }
-
-  /**
-   * Builds an update Document for updating the meta alerts list.
-   * @param alertGuid The GUID of the alert to update
-   * @param sensorType The sensor type to update
-   * @param metaAlertField The new metaAlertList to use
-   * @return The update Document
-   */
-  protected Document buildAlertUpdate(String alertGuid, String sensorType,
-      List<String> metaAlertField, Long timestamp) {
-    Document alertUpdate;
-    Map<String, Object> document = new HashMap<>();
-    document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    alertUpdate = new Document(
-        document,
-        alertGuid,
-        sensorType,
-        timestamp
-    );
-    return alertUpdate;
-  }
-
-  /**
-   * Takes care of upserting a child alert to a meta alert.
-   * @param update The update Document to be applied
-   * @param hit The meta alert to be updated
-   * @throws IOException If there's an issue running the upsert
-   */
-  protected void handleAlertUpdate(Document update, SearchHit hit) throws IOException {
-    XContentBuilder builder = buildUpdatedMetaAlert(update, hit);
-
-    // Run the meta alert's update
-    IndexRequest indexRequest = new IndexRequest(
-        METAALERTS_INDEX,
-        METAALERT_DOC,
-        hit.getId()
-    ).source(builder);
-    UpdateRequest updateRequest = new UpdateRequest(
-        METAALERTS_INDEX,
-        METAALERT_DOC,
-        hit.getId()
-    ).doc(builder).upsert(indexRequest);
-    try {
-      UpdateResponse updateResponse = elasticsearchDao.getClient().update(updateRequest).get();
-
-      ShardInfo shardInfo = updateResponse.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchMetaAlertDao upsert failed: "
-                + Arrays.toString(shardInfo.getFailures())
-        );
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-  }
-
   @Override
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices)
       throws IOException {
@@ -595,80 +596,26 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
   /**
    * Calculate the meta alert scores for a Document.
-   * @param document The Document containing scores
+   * @param metaAlert The Document containing scores
    * @return Set of score statistics
    */
   @SuppressWarnings("unchecked")
-  protected MetaScores calculateMetaScores(Document document) {
-    List<Object> alertsRaw = ((List<Object>) document.getDocument().get(ALERT_FIELD));
-    if (alertsRaw == null || alertsRaw.isEmpty()) {
-      throw new IllegalArgumentException("No alerts to use in calculation for doc GUID: "
-          + document.getDocument().get(Constants.GUID));
-    }
-
-    ArrayList<Double> scores = new ArrayList<>();
-    for (Object alertRaw : alertsRaw) {
-      Map<String, Object> alert = (Map<String, Object>) alertRaw;
-      Double scoreNum = parseThreatField(alert.get(threatTriageField));
-      if (scoreNum != null) {
-        scores.add(scoreNum);
-      }
-    }
-
-    return new MetaScores(scores);
-  }
-
-  /**
-   * Builds the updated meta alert based on the update.
-   * @param update The update Document for the meta alert
-   * @param hit The meta alert to be updated
-   * @return A builder for Elasticsearch to use
-   * @throws IOException If we have an issue building the result
-   */
-  protected XContentBuilder buildUpdatedMetaAlert(Document update, SearchHit hit)
-      throws IOException {
-    // Make sure to get all the threat scores while we're going through the docs
-    List<Double> scores = new ArrayList<>();
-    // Start building the new version of the metaalert
-    XContentBuilder builder = jsonBuilder().startObject();
-
-    // Run through the nested alerts of the meta alert and either use the new or old versions
-    builder.startArray(ALERT_FIELD);
-    Map<String, Object> hitAlerts = hit.sourceAsMap();
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> alertHits = (List<Map<String, Object>>) hitAlerts.get(ALERT_FIELD);
-    for (Map<String, Object> alertHit : alertHits) {
-      Map<String, Object> docMap = alertHit;
-      // If we're at the update use it instead of the original
-      if (alertHit.get(Constants.GUID).equals(update.getGuid())) {
-        docMap = update.getDocument();
-      }
-      builder.map(docMap);
-
-      // Handle either String or Number values in the threatTriageField
-      Object threatRaw = docMap.get(threatTriageField);
-      Double threat = parseThreatField(threatRaw);
-
-      if (threat != null) {
-        scores.add(threat);
-      }
-    }
-    builder.endArray();
-
-    // Add all the meta alert fields, and score calculation
-    Map<String, Object> updatedMeta = new HashMap<>();
-    updatedMeta.putAll(hit.getSource());
-    updatedMeta.putAll(new MetaScores(scores).getMetaScores());
-    for (Entry<String, Object> entry : updatedMeta.entrySet()) {
-      // The alerts field is being added separately, so ignore the original
-      if (!(entry.getKey().equals(ALERT_FIELD))) {
-        builder.field(entry.getKey(), entry.getValue());
+  protected void calculateMetaScores(Document metaAlert) {
+    MetaScores metaScores = new MetaScores(new ArrayList<>());
+    List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
+    if (alertsRaw != null && !alertsRaw.isEmpty()) {
+      ArrayList<Double> scores = new ArrayList<>();
+      for (Object alertRaw : alertsRaw) {
+        Map<String, Object> alert = (Map<String, Object>) alertRaw;
+        Double scoreNum = parseThreatField(alert.get(threatTriageField));
+        if (scoreNum != null) {
+          scores.add(scoreNum);
+        }
       }
+      metaScores = new MetaScores(scores);
     }
-    builder.endObject();
-
-    return builder;
+    metaAlert.getDocument().putAll(metaScores.getMetaScores());
+    metaAlert.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
   }
 
   private Double parseThreatField(Object threatRaw) {
@@ -680,4 +627,12 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     }
     return threat;
   }
+
+  public int getPageSize() {
+    return pageSize;
+  }
+
+  public void setPageSize(int pageSize) {
+    this.pageSize = pageSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
deleted file mode 100644
index 6c8e858..0000000
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.elasticsearch.dao;
-
-public enum MetaAlertStatus {
-  ACTIVE("active"),
-  INACTIVE("inactive");
-
-  private String statusString;
-
-  MetaAlertStatus(String statusString) {
-    this.statusString = statusString;
-  }
-
-  public String getStatusString() {
-    return statusString;
-  }
-}


Mime
View raw message