metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmiklav...@apache.org
Subject [03/10] metron git commit: METRON-939: Upgrade ElasticSearch and Kibana (mmiklavc via mmiklavc) closes apache/metron#840
Date Mon, 08 Jan 2018 19:10:10 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
deleted file mode 100644
index df485f0..0000000
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
+++ /dev/null
@@ -1,855 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.bulk;
-
-import com.carrotsearch.hppc.ObjectObjectHashMap;
-import org.apache.commons.collections.IteratorUtils;
-import org.apache.metron.TestConstants;
-import org.apache.metron.common.configuration.Configuration;
-import org.easymock.EasyMock;
-import org.elasticsearch.action.*;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
-import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder;
-import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
-import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
-import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequestBuilder;
-import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
-import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
-import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequestBuilder;
-import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
-import org.elasticsearch.action.admin.indices.flush.*;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.*;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
-import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
-import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
-import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
-import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequestBuilder;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
-import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
-import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
-import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
-import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
-import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequestBuilder;
-import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
-import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequest;
-import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequestBuilder;
-import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequestBuilder;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse;
-import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest;
-import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
-import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
-import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerRequest;
-import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerRequestBuilder;
-import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
-import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersRequest;
-import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersRequestBuilder;
-import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
-import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerRequest;
-import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerRequestBuilder;
-import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.ClusterAdminClient;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DeleteIndexResponse.class)
-public class ElasticsearchDataPrunerTest {
-
-    private Date testDate;
-    private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
-    private Configuration configuration;
-
-    private Client indexClient = mock(Client.class);
-    private AdminClient adminClient = mock(AdminClient.class);
-    private IndicesAdminClient indicesAdminClient = new TestIndicesAdminClient();
-    private DeleteIndexRequestBuilder deleteIndexRequestBuilder = mock(DeleteIndexRequestBuilder.class);
-    private DeleteIndexRequest deleteIndexRequest = mock(DeleteIndexRequest.class);
-    private ActionFuture<DeleteIndexResponse> deleteIndexAction = mock(ActionFuture.class);
-    private DeleteIndexResponse deleteIndexResponse = PowerMock.createMock(DeleteIndexResponse.class);
-
-
-    private ByteArrayOutputStream outContent;
-    private ByteArrayOutputStream errContent;
-
-    @Before
-    public void setUp() throws Exception {
-
-        Calendar calendar = Calendar.getInstance();
-        calendar.set(Calendar.MONTH, Calendar.MARCH);
-        calendar.set(Calendar.YEAR, 2016);
-        calendar.set(Calendar.DATE, 31);
-        calendar.set(Calendar.HOUR_OF_DAY, 0);
-        calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.SECOND, 0);
-        calendar.set(Calendar.MILLISECOND,0);
-        testDate = calendar.getTime();
-
-        when(indexClient.admin()).thenReturn(adminClient);
-        when(adminClient.indices()).thenReturn(indicesAdminClient);
-        when(deleteIndexRequestBuilder.request()).thenReturn(deleteIndexRequest);
-        when(deleteIndexAction.actionGet()).thenReturn(deleteIndexResponse);
-
-        File resourceFile = new File(TestConstants.SAMPLE_CONFIG_PATH);
-        Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
-
-        configuration = new Configuration(resourcePath);
-
-        outContent = new ByteArrayOutputStream();
-        errContent = new ByteArrayOutputStream();
-
-        System.setOut(new PrintStream(outContent));
-        System.setErr(new PrintStream(errContent));
-
-    }
-
-    @Test(expected = IndexNotFoundException.class)
-    public void testWillThrowOnMissingIndex() throws Exception {
-
-        ((TestIndicesAdminClient)indicesAdminClient).throwMissingIndex = true;
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 30, configuration, indexClient,"*");
-        pruner.deleteIndex(adminClient, "baz");
-        ((TestIndicesAdminClient)indicesAdminClient).throwMissingIndex = false;
-
-    }
-
-    @Test
-    public void testDeletesCorrectIndexes() throws Exception {
-
-        //Mock Cluster Admin
-        ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
-        ClusterStateRequestBuilder clusterStateRequestBuilder = mock(ClusterStateRequestBuilder.class);
-        ClusterStateResponse clusterStateResponse = mock(ClusterStateResponse.class);
-        ClusterState clusterState = mock(ClusterState.class);
-        ObjectObjectHashMap<String, IndexMetaData> clusterIndexes = new ObjectObjectHashMap();
-        MetaData clusterMetadata = mock(MetaData.class);
-        when(adminClient.cluster()).thenReturn(clusterAdminClient);
-        when(clusterAdminClient.prepareState()).thenReturn(clusterStateRequestBuilder);
-        when(clusterStateRequestBuilder.get()).thenReturn(clusterStateResponse);
-        when(clusterStateResponse.getState()).thenReturn(clusterState);
-        when(clusterState.getMetaData()).thenReturn(clusterMetadata);
-
-        int numDays = 5;
-
-        Date indexDate = new Date();
-
-        indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
-
-        for (int i = 0; i < numDays * 24; i++) {
-
-            String indexName = "sensor_index_" + dateFormat.format(indexDate);
-            clusterIndexes.put(indexName, null);
-            indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
-
-        }
-
-        when(clusterMetadata.getIndices()).thenReturn(ImmutableOpenMap.copyOf(clusterIndexes));
-
-
-        EasyMock.expect(deleteIndexResponse.isAcknowledged()).andReturn(true);
-
-        replayAll();
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
-        pruner.indexClient = indexClient;
-        Long deleteCount = pruner.prune();
-        assertEquals("Should have pruned 24 indices", 24L, deleteCount.longValue());
-        verifyAll();
-
-    }
-
-    @Test
-    public void testFilter() throws Exception {
-
-        ObjectObjectHashMap<String, IndexMetaData> indexNames = new ObjectObjectHashMap();
-        SimpleDateFormat dateChecker = new SimpleDateFormat("yyyyMMdd");
-        int numDays = 5;
-        String[] expectedIndices = new String[24];
-        Date indexDate = new Date();
-
-        indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
-
-        for (int i = 0, j=0; i < numDays * 24; i++) {
-
-            String indexName = "sensor_index_" + dateFormat.format(indexDate);
-            //Delete 20160330
-            if( dateChecker.format(indexDate).equals("20160330") ){
-                expectedIndices[j++] = indexName;
-            }
-
-            indexNames.put(indexName, null);
-            indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
-
-        }
-
-        ImmutableOpenMap<String, IndexMetaData> testIndices = ImmutableOpenMap.copyOf(indexNames);
-
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration,  indexClient, "sensor_index_");
-        pruner.indexClient = indexClient;
-
-        Iterable<String> filteredIndices = pruner.getFilteredIndices(testIndices);
-
-        Object[] indexArray = IteratorUtils.toArray(filteredIndices.iterator());
-        Arrays.sort(indexArray);
-        Arrays.sort(expectedIndices);
-
-        assertArrayEquals(expectedIndices,indexArray);
-
-    }
-
-    class TestIndicesAdminClient implements IndicesAdminClient {
-
-        public boolean throwMissingIndex = false;
-
-        @Override
-        public ActionFuture<DeleteIndexResponse> delete(DeleteIndexRequest request) {
-
-            if(throwMissingIndex){
-
-                throw new IndexNotFoundException("TEST EXCEPTION!");
-
-            }
-
-            return deleteIndexAction;
-
-        }
-
-
-        @Override
-        public ActionFuture<IndicesExistsResponse> exists(IndicesExistsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void exists(IndicesExistsRequest request, ActionListener<IndicesExistsResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesExistsRequestBuilder prepareExists(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<TypesExistsResponse> typesExists(TypesExistsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void typesExists(TypesExistsRequest request, ActionListener<TypesExistsResponse> listener) {
-
-        }
-
-        @Override
-        public TypesExistsRequestBuilder prepareTypesExists(String... index) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesStatsResponse> stats(IndicesStatsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void stats(IndicesStatsRequest request, ActionListener<IndicesStatsResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesStatsRequestBuilder prepareStats(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<RecoveryResponse> recoveries(RecoveryRequest request) {
-            return null;
-        }
-
-        @Override
-        public void recoveries(RecoveryRequest request, ActionListener<RecoveryResponse> listener) {
-
-        }
-
-        @Override
-        public RecoveryRequestBuilder prepareRecoveries(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesSegmentResponse> segments(IndicesSegmentsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void segments(IndicesSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesSegmentsRequestBuilder prepareSegments(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesShardStoresResponse> shardStores(IndicesShardStoresRequest request) {
-            return null;
-        }
-
-        @Override
-        public void shardStores(IndicesShardStoresRequest request, ActionListener<IndicesShardStoresResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesShardStoreRequestBuilder prepareShardStores(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<CreateIndexResponse> create(CreateIndexRequest request) {
-            return null;
-        }
-
-        @Override
-        public void create(CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
-
-        }
-
-        @Override
-        public CreateIndexRequestBuilder prepareCreate(String index) {
-            return null;
-        }
-
-
-        @Override
-        public void delete(DeleteIndexRequest request, ActionListener<DeleteIndexResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteIndexRequestBuilder prepareDelete(String... indices) {
-            return deleteIndexRequestBuilder;
-        }
-
-        @Override
-        public ActionFuture<CloseIndexResponse> close(CloseIndexRequest request) {
-            return null;
-        }
-
-        @Override
-        public void close(CloseIndexRequest request, ActionListener<CloseIndexResponse> listener) {
-
-        }
-
-        @Override
-        public CloseIndexRequestBuilder prepareClose(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<OpenIndexResponse> open(OpenIndexRequest request) {
-            return null;
-        }
-
-        @Override
-        public void open(OpenIndexRequest request, ActionListener<OpenIndexResponse> listener) {
-
-        }
-
-        @Override
-        public OpenIndexRequestBuilder prepareOpen(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<RefreshResponse> refresh(RefreshRequest request) {
-            return null;
-        }
-
-        @Override
-        public void refresh(RefreshRequest request, ActionListener<RefreshResponse> listener) {
-
-        }
-
-        @Override
-        public RefreshRequestBuilder prepareRefresh(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<FlushResponse> flush(FlushRequest request) {
-            return null;
-        }
-
-        @Override
-        public void flush(FlushRequest request, ActionListener<FlushResponse> listener) {
-
-        }
-
-        @Override
-        public FlushRequestBuilder prepareFlush(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<SyncedFlushResponse> syncedFlush(SyncedFlushRequest request) {
-            return null;
-        }
-
-        @Override
-        public void syncedFlush(SyncedFlushRequest request, ActionListener<SyncedFlushResponse> listener) {
-
-        }
-
-        @Override
-        public SyncedFlushRequestBuilder prepareSyncedFlush(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ForceMergeResponse> forceMerge(ForceMergeRequest request) {
-            return null;
-        }
-
-        @Override
-        public void forceMerge(ForceMergeRequest request, ActionListener<ForceMergeResponse> listener) {
-
-        }
-
-        @Override
-        public ForceMergeRequestBuilder prepareForceMerge(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpgradeResponse> upgrade(UpgradeRequest request) {
-            return null;
-        }
-
-        @Override
-        public void upgrade(UpgradeRequest request, ActionListener<UpgradeResponse> listener) {
-
-        }
-
-        @Override
-        public UpgradeStatusRequestBuilder prepareUpgradeStatus(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpgradeStatusResponse> upgradeStatus(UpgradeStatusRequest request) {
-            return null;
-        }
-
-        @Override
-        public void upgradeStatus(UpgradeStatusRequest request, ActionListener<UpgradeStatusResponse> listener) {
-
-        }
-
-        @Override
-        public UpgradeRequestBuilder prepareUpgrade(String... indices) {
-            return null;
-        }
-
-        @Override
-        public void getMappings(GetMappingsRequest request, ActionListener<GetMappingsResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetMappingsResponse> getMappings(GetMappingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public GetMappingsRequestBuilder prepareGetMappings(String... indices) {
-            return null;
-        }
-
-        @Override
-        public void getFieldMappings(GetFieldMappingsRequest request, ActionListener<GetFieldMappingsResponse> listener) {
-
-        }
-
-        @Override
-        public GetFieldMappingsRequestBuilder prepareGetFieldMappings(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetFieldMappingsResponse> getFieldMappings(GetFieldMappingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutMappingResponse> putMapping(PutMappingRequest request) {
-            return null;
-        }
-
-        @Override
-        public void putMapping(PutMappingRequest request, ActionListener<PutMappingResponse> listener) {
-
-        }
-
-        @Override
-        public PutMappingRequestBuilder preparePutMapping(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesAliasesResponse> aliases(IndicesAliasesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void aliases(IndicesAliasesRequest request, ActionListener<IndicesAliasesResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesAliasesRequestBuilder prepareAliases() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetAliasesResponse> getAliases(GetAliasesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void getAliases(GetAliasesRequest request, ActionListener<GetAliasesResponse> listener) {
-
-        }
-
-        @Override
-        public GetAliasesRequestBuilder prepareGetAliases(String... aliases) {
-            return null;
-        }
-
-        @Override
-        public AliasesExistRequestBuilder prepareAliasesExist(String... aliases) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<AliasesExistResponse> aliasesExist(GetAliasesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void aliasesExist(GetAliasesRequest request, ActionListener<AliasesExistResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetIndexResponse> getIndex(GetIndexRequest request) {
-            return null;
-        }
-
-        @Override
-        public void getIndex(GetIndexRequest request, ActionListener<GetIndexResponse> listener) {
-
-        }
-
-        @Override
-        public GetIndexRequestBuilder prepareGetIndex() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ClearIndicesCacheResponse> clearCache(ClearIndicesCacheRequest request) {
-            return null;
-        }
-
-        @Override
-        public void clearCache(ClearIndicesCacheRequest request, ActionListener<ClearIndicesCacheResponse> listener) {
-
-        }
-
-        @Override
-        public ClearIndicesCacheRequestBuilder prepareClearCache(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpdateSettingsResponse> updateSettings(UpdateSettingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void updateSettings(UpdateSettingsRequest request, ActionListener<UpdateSettingsResponse> listener) {
-
-        }
-
-        @Override
-        public UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request) {
-            return null;
-        }
-
-        @Override
-        public void analyze(AnalyzeRequest request, ActionListener<AnalyzeResponse> listener) {
-
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze(@Nullable String index, String text) {
-            return null;
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze(String text) {
-            return null;
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutIndexTemplateResponse> putTemplate(PutIndexTemplateRequest request) {
-            return null;
-        }
-
-        @Override
-        public void putTemplate(PutIndexTemplateRequest request, ActionListener<PutIndexTemplateResponse> listener) {
-
-        }
-
-        @Override
-        public PutIndexTemplateRequestBuilder preparePutTemplate(String name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<DeleteIndexTemplateResponse> deleteTemplate(DeleteIndexTemplateRequest request) {
-            return null;
-        }
-
-        @Override
-        public void deleteTemplate(DeleteIndexTemplateRequest request, ActionListener<DeleteIndexTemplateResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteIndexTemplateRequestBuilder prepareDeleteTemplate(String name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetIndexTemplatesResponse> getTemplates(GetIndexTemplatesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void getTemplates(GetIndexTemplatesRequest request, ActionListener<GetIndexTemplatesResponse> listener) {
-
-        }
-
-        @Override
-        public GetIndexTemplatesRequestBuilder prepareGetTemplates(String... name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ValidateQueryResponse> validateQuery(ValidateQueryRequest request) {
-            return null;
-        }
-
-        @Override
-        public void validateQuery(ValidateQueryRequest request, ActionListener<ValidateQueryResponse> listener) {
-
-        }
-
-        @Override
-        public ValidateQueryRequestBuilder prepareValidateQuery(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutWarmerResponse> putWarmer(PutWarmerRequest request) {
-            return null;
-        }
-
-        @Override
-        public void putWarmer(PutWarmerRequest request, ActionListener<PutWarmerResponse> listener) {
-
-        }
-
-        @Override
-        public PutWarmerRequestBuilder preparePutWarmer(String name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<DeleteWarmerResponse> deleteWarmer(DeleteWarmerRequest request) {
-            return null;
-        }
-
-        @Override
-        public void deleteWarmer(DeleteWarmerRequest request, ActionListener<DeleteWarmerResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteWarmerRequestBuilder prepareDeleteWarmer() {
-            return null;
-        }
-
-        @Override
-        public void getWarmers(GetWarmersRequest request, ActionListener<GetWarmersResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetWarmersResponse> getWarmers(GetWarmersRequest request) {
-            return null;
-        }
-
-        @Override
-        public GetWarmersRequestBuilder prepareGetWarmers(String... indices) {
-            return null;
-        }
-
-        @Override
-        public void getSettings(GetSettingsRequest request, ActionListener<GetSettingsResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetSettingsResponse> getSettings(GetSettingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public GetSettingsRequestBuilder prepareGetSettings(String... indices) {
-            return null;
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder> action, Request request) {
-            return null;
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
-
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> RequestBuilder prepareExecute(Action<Request, Response, RequestBuilder> action) {
-            return null;
-        }
-
-        @Override
-        public ThreadPool threadPool() {
-            return null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md
index 7278a25..1e15018 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -17,6 +17,15 @@ limitations under the License.
 -->
 # Elasticsearch in Metron
 
+## Table of Contents
+
+* [Introduction](#introduction)
+* [Properties](#properties)
+* [Upgrading to 5.6.2](#upgrading-to-562)
+* [Type Mappings](#type-mappings)
+* [Using Metron with Elasticsearch 5.6.2](#using-metron-with-elasticsearch-562)
+* [Installing Elasticsearch Templates](#installing-elasticsearch-templates)
+
 ## Introduction
 
 Elasticsearch can be used as the real-time portion of the datastore resulting from [metron-indexing](../metron-indexing/README.md).
@@ -50,9 +59,219 @@ For instance, an `es.date.format` of `yyyy.MM.dd.HH` would have the consequence
 roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the consequence that the indices would
 roll daily.
 
-## Using Metron with Elasticsearch 2.x
+## Upgrading to 5.6.2
+
+Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around
+string type handling, that may cause issues when upgrading.
+
+[https://www.elastic.co/guide/en/elasticsearch/reference/5.6/setup-upgrade.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/setup-upgrade.html)
+
+Be aware that if you add a new string value and want to be able to filter and search on this value from the Alerts UI, you **must** add a mapping for that type to
+the appropriate Elasticsearch template. Below is more detail on how to choose the appropriate mapping type for your string value.
+
+## Type Mappings
+
+Type mappings have changed quite a bit from ES 2.x -> 5.x. Here is a brief rundown of the biggest changes. More detailed references from Elasticsearch
+are provided in the [Type Mapping References](#type-mapping-references) section below.
+* string fields replaced by text/keyword type
+* strings have new default mappings as follows
+
+    ```
+    {
+      "type": "text",
+      "fields": {
+        "keyword": {
+          "type": "keyword",
+          "ignore_above": 256
+        }
+      }
+    }
+    ```
+
+* There is no longer a `_timestamp` field that you can set "enabled" on. This field now causes an exception on templates.
+Replace with an application-created timestamp of "date" type.
+
+The semantics for string types have changed. In 2.x, you have the concept of index settings as either "analyzed" or "not_analyzed" which basically means "full text" and "keyword", respectively.
+Analyzed text basically means the indexer will split the text using a text analyzer thus allowing you to search on substrings within the original text. "New York" is split and indexed as two buckets,
+ "New" and "York", so you can search or query for aggregate counts for those terms independently and will match against the individual terms "New" or "York." "Keyword" means that the original text
+ will not be split/analyzed during indexing and instead treated as a whole unit, i.e. "New" or "York" will not match in searches against the document containing "New York", but searching on "New York"
+ as the full city name will. In 5.x language instead of using the "index" setting, you now set the "type" to either "text" for full text, or "keyword" for keywords.
+
+Below is a table depicting the changes to how String types are now handled.
+
+<table>
+<tr>
+	<th>sort, aggregate, or access values</th>
+	<th>ES 2.x</th>
+	<th>ES 5.x</th>
+	<th>Example</th>
+</tr>
+<tr>
+	<td>no</td>
+	<td>
+<pre><code>"my_property" : {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+	</td>
+	<td>
+<pre><code>"my_property" : {
+  "type": "text"
+}
+</code></pre>
+    Additional defaults: "index": "true", "fielddata": "false"
+	</td>
+	<td>
+		"New York" handled via in-mem search as "New" and "York" buckets. <strong>No</strong> aggregation or sort.
+	</td>
+</tr>
+<tr>
+	<td>
+	yes
+	</td>
+	<td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+	</td>
+	<td>
+<pre><code>"my_property": {
+  "type": "text",
+  "fielddata": "true"
+}
+</code></pre>
+	</td>
+	<td>
+	"New York" handled via in-mem search as "New" and "York" buckets. <strong>Can</strong> aggregate and sort.
+	</td>
+</tr>
+<tr>
+	<td>
+	yes
+	</td>
+	<td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "not_analyzed"
+}
+</code></pre>
+	</td>
+	<td>
+<pre><code>"my_property" : {
+  "type": "keyword"
+}
+</code></pre>
+	</td>
+	<td>
+	"New York" searchable as single value. <strong>Can</strong> aggregate and sort. A search for "New" or "York" will not match against the whole value.
+	</td>
+</tr>
+<tr>
+	<td>
+	yes
+	</td>
+	<td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+	</td>
+	<td>
+<pre><code>"my_property": {
+  "type": "text",
+  "fields": {
+    "keyword": {
+      "type": "keyword",
+      "ignore_above": 256
+    }
+  }
+}
+</code></pre>
+	</td>
+	<td>
+	"New York" searchable as single value or as text document, can aggregate and sort on the sub term "keyword."
+	</td>
+</tr>
+</table>
 
-With Elasticsearch 2.x, there is a requirement that all sensors templates have a nested alert field defined.  This field is a dummy field, and will be obsolete in Elasticsearch 5.x.  See [Ignoring Unmapped Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields) for more information
+If you want to set default string behavior for all strings for a given index and type, you can do so with a mapping similar to the following (replace ${your_type_here} accordingly):
+
+```
+# curl -XPUT 'http://${ES_HOST}:${ES_PORT}/_template/default_string_template' -d '
+{
+  "template": "*",
+  "mappings" : {
+    "${your_type_here}": {
+      "dynamic_templates": [
+        {
+          "strings": {
+            "match_mapping_type": "string",
+            "mapping": {
+              "type": "text"
+            }
+          }
+        }
+      ]
+    }
+  }
+}
+'
+```
+
+By specifying the "template" property with value "*" the template will apply to all indexes that have documents indexed of the specified type (${your_type_here}). This results in the following template.
+
+```
+# curl -XGET 'http://${ES_HOST}:${ES_PORT}/_template/default_string_template?pretty'
+{
+  "default_string_template" : {
+    "order" : 0,
+    "template" : "*",
+    "settings" : { },
+    "mappings" : {
+      "${your_type_here}" : {
+        "dynamic_templates" : [
+          {
+            "strings" : {
+              "match_mapping_type" : "string",
+              "mapping" : {
+                "type" : "text"
+              }
+            }
+          }
+        ]
+      }
+    },
+    "aliases" : { }
+  }
+}
+```
+
+Notes on other settings for types in ES
+* doc_values
+    * on-disk data structure
+    * provides access for sorting, aggregation, and field values
+    * stores same values as _source, but in column-oriented fashion better for sorting and aggregating
+    * not supported on text fields
+    * enabled by default
+* fielddata
+    * in-memory data structure
+    * provides access for sorting, aggregation, and field values
+    * primarily for text fields
+    * disabled by default because the heap space required can be large
+
+
+##### Type Mapping References
+* [https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html)
+* [https://www.elastic.co/guide/en/elasticsearch/reference/5.6/breaking_50_mapping_changes.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/breaking_50_mapping_changes.html)
+* [https://www.elastic.co/blog/strings-are-dead-long-live-strings](https://www.elastic.co/blog/strings-are-dead-long-live-strings)
+
+## Using Metron with Elasticsearch 5.6.2
+
+There is a requirement that all sensors templates have a nested alert field defined.  This field is a dummy field.  See [Ignoring Unmapped Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields) for more information
 
 Without this field, an error will be thrown during ALL searches (including from UIs, resulting in no alerts being found for any sensor). This error will be found in the REST service's logs.
 
@@ -63,7 +282,7 @@ QueryParsingException[[nested] failed to find nested object under path [alert]];
 
 There are two steps to resolve this issue.  First is to update the Elasticsearch template for each sensor, so any new indices have the field. This requires retrieving the template, removing an extraneous JSON field so we can put it back later, and adding our new field.
 
-Make sure to set the ELASTICSEARCH variable appropriately. $SENSOR can contain wildcards, so if rollover has occurred, it's not necessary to do each index individually. The example here appends `index*` to get all indexes for a the provided sensor.
+Make sure to set the ELASTICSEARCH variable appropriately. $SENSOR can contain wildcards, so if rollover has occurred, it's not necessary to do each index individually. The example here appends `index*` to get all indexes for the provided sensor.
 
 ```
 export ELASTICSEARCH="node1"
@@ -89,11 +308,11 @@ To update existing indexes, update Elasticsearch mappings with the new field for
 ```
 curl -XPUT "http://${ELASTICSEARCH}:9200/${SENSOR}_index*/_mapping/${SENSOR}_doc" -d '
 {
-        "properties" : {
-          "alert" : {
-            "type" : "nested"
-          }
-        }
+  "properties" : {
+    "alert" : {
+      "type" : "nested"
+    }
+  }
 }
 '
 rm ${SENSOR}.template

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index d924891..97f4062 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -176,8 +176,14 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
             <version>${global_mockito_version}</version>
             <scope>test</scope>
         </dependency>
@@ -200,6 +206,16 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${global_log4j_core_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${global_log4j_core_version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
new file mode 100644
index 0000000..0a04dfc
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.bulk;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.common.utils.JSONUtils;
+
+/**
+ * This is a utility for taking a file of JSON objects that were exported from ES and transforming
+ * it into a bulk import format. This was useful for backing up and restoring the Kibana dashboard
+ * index. The notable gap is that it expects one record per line in the file, which is not how
+ * ES generally returns results. Elasticsearch-dump was used as the intermediary to export data in
+ * the desired format for consumption by this tool.
+ * @see <a href="https://github.com/taskrabbit/elasticsearch-dump">https://github.com/taskrabbit/elasticsearch-dump</a>
+ */
+public class ElasticsearchImportExport {
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      throw new RuntimeException("Expects 'input' and 'output' file arguments.");
+    }
+    final String inPath = args[0];
+    final String outPath = args[1];
+    try {
+      new ElasticsearchImportExport().bulkify(Paths.get(inPath), Paths.get(outPath));
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
+    System.exit(0);
+  }
+
+  /**
+   * Takes a file of line-delimited JSON objects and converts them to an Elasticsearch bulk import
+   * format.
+   *
+   * @param input input JSON file (note, each line is expected to be a separate complete JSON
+   * object, not the file as a whole.)
+   * @param output Elasticsearch bulk import file.
+   * @throws IOException
+   */
+  public void bulkify(Path input, Path output) throws IOException {
+    List<String> outRecs = new ArrayList<String>();
+    try (BufferedReader br = new BufferedReader(new FileReader(input.toFile()))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        Map<String, Object> inDoc = JSONUtils.INSTANCE
+            .load(line, new TypeReference<Map<String, Object>>() {
+            });
+        Object id = inDoc.get("_id");
+        Object type = inDoc.get("_type");
+        String createRaw = String
+            .format("{ \"create\" : { \"_id\": \"%s\", \"_type\": \"%s\" } }", id, type);
+        String outData = JSONUtils.INSTANCE.toJSON(inDoc.get("_source"), false);
+        outRecs.add(createRaw);
+        outRecs.add(outData);
+      }
+    }
+    try (BufferedWriter br = new BufferedWriter(new FileWriter(output.toFile()))) {
+      for (String line : outRecs) {
+        br.write(line);
+        br.write(System.lineSeparator());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 9f21994..c12802e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -48,7 +48,8 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao {
   private static Map<String, FieldType> elasticsearchTypeMap;
   static {
     Map<String, FieldType> fieldTypeMap = new HashMap<>();
-    fieldTypeMap.put("string", FieldType.STRING);
+    fieldTypeMap.put("text", FieldType.TEXT);
+    fieldTypeMap.put("keyword", FieldType.KEYWORD);
     fieldTypeMap.put("ip", FieldType.IP);
     fieldTypeMap.put("integer", FieldType.INTEGER);
     fieldTypeMap.put("long", FieldType.LONG);

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 910c09b..650462e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -37,27 +37,32 @@ import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.index.mapper.ip.IpFieldMapper;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
@@ -124,8 +129,27 @@ public class ElasticsearchDao implements IndexDao {
     //uninitialized.
   }
 
+  private static Map<String, FieldType> elasticsearchSearchTypeMap;
+
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("text", FieldType.TEXT);
+    fieldTypeMap.put("keyword", FieldType.KEYWORD);
+    fieldTypeMap.put("ip", FieldType.IP);
+    fieldTypeMap.put("integer", FieldType.INTEGER);
+    fieldTypeMap.put("long", FieldType.LONG);
+    fieldTypeMap.put("date", FieldType.DATE);
+    fieldTypeMap.put("float", FieldType.FLOAT);
+    fieldTypeMap.put("double", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
+    if(searchRequest.getQuery() == null) {
+      throw new InvalidSearchException("Search query is invalid: null");
+    }
     return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery()));
   }
 
@@ -162,14 +186,15 @@ public class ElasticsearchDao implements IndexDao {
   private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
           SearchRequest searchRequest,
           QueryBuilder queryBuilder) throws InvalidSearchException {
-
-    LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    }
     SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(queryBuilder)
             .trackScores(true);
-
+    Optional<List<String>> fields = searchRequest.getFields();
     // column metadata needed to understand the type of each sort field
     Map<String, FieldType> meta;
     try {
@@ -202,24 +227,30 @@ public class ElasticsearchDao implements IndexDao {
     }
 
     // handle search fields
-    if (searchRequest.getFields().isPresent()) {
-      searchBuilder.fields(searchRequest.getFields().get());
+    if (fields.isPresent()) {
+      searchBuilder.fetchSource("*", null);
     } else {
       searchBuilder.fetchSource(true);
     }
 
+    Optional<List<String>> facetFields = searchRequest.getFacetFields();
+
     // handle facet fields
     if (searchRequest.getFacetFields().isPresent()) {
+      // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html
       for(String field : searchRequest.getFacetFields().get()) {
-        String name = getFacentAggregationName(field);
-        TermsBuilder terms = new TermsBuilder(name).field(field);
+        String name = getFacetAggregationName(field);
+        TermsAggregationBuilder terms = AggregationBuilders.terms( name).field(field);
+               // new TermsBuilder(name).field(field);
         searchBuilder.aggregation(terms);
       }
     }
 
     // return the search request
     String[] indices = wildcardIndices(searchRequest.getIndices());
-    LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString());
+    }
     return new org.elasticsearch.action.search.SearchRequest()
             .indices(indices)
             .source(searchBuilder);
@@ -240,12 +271,13 @@ public class ElasticsearchDao implements IndexDao {
           org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException {
 
     SearchResponse searchResponse = new SearchResponse();
+
     searchResponse.setTotal(esResponse.getHits().getTotalHits());
 
     // search hits --> search results
     List<SearchResult> results = new ArrayList<>();
     for(SearchHit hit: esResponse.getHits().getHits()) {
-      results.add(getSearchResult(hit, searchRequest.getFields().isPresent()));
+      results.add(getSearchResult(hit, searchRequest.getFields()));
     }
     searchResponse.setResults(results);
 
@@ -263,7 +295,9 @@ public class ElasticsearchDao implements IndexDao {
       searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata ));
     }
 
-    LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
+    }
     return searchResponse;
   }
 
@@ -309,7 +343,7 @@ public class ElasticsearchDao implements IndexDao {
           QueryBuilder queryBuilder) {
 
     // handle groups
-    TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+    TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0);
     final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
             .query(queryBuilder)
             .aggregation(groups);
@@ -446,16 +480,24 @@ public class ElasticsearchDao implements IndexDao {
    */
   <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
       Function<SearchHit, Optional<T>> callback) {
-    QueryBuilder query;
+    if(guids == null || guids.isEmpty()) {
+      return Collections.EMPTY_LIST;
+    }
+    QueryBuilder query = null;
+    IdsQueryBuilder idsQuery = null;
     if (sensorTypes != null) {
       String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
-      query = QueryBuilders.idsQuery(types).ids(guids);
+      idsQuery = QueryBuilders.idsQuery(types);
     } else {
-      query = QueryBuilders.idsQuery().ids(guids);
+      idsQuery = QueryBuilders.idsQuery();
     }
+
+    for(String guid : guids) {
+        query = idsQuery.addIds(guid);
+    }
+
     SearchRequestBuilder request = client.prepareSearch()
                                          .setQuery(query)
-                                         .setSource("message")
                                          .setSize(guids.size())
                                          ;
     org.elasticsearch.action.search.SearchResponse response = request.get();
@@ -569,7 +611,7 @@ public class ElasticsearchDao implements IndexDao {
     Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
     for (String field: fields) {
       Map<String, Long> valueCounts = new HashMap<>();
-      Aggregation aggregation = aggregations.get(getFacentAggregationName(field));
+      Aggregation aggregation = aggregations.get(getFacetAggregationName(field));
       if (aggregation instanceof Terms) {
         Terms terms = (Terms) aggregation;
         terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount()));
@@ -580,8 +622,8 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   private String formatKey(Object key, FieldType type) {
-    if (FieldType.IP.equals(type)) {
-      return IpFieldMapper.longToIp((Long) key);
+    if (FieldType.IP.equals(type) && key instanceof Long) {
+      return LegacyIpFieldMapper.longToIp((Long) key);
     } else if (FieldType.BOOLEAN.equals(type)) {
       return (Long) key == 1 ? "true" : "false";
     } else {
@@ -589,11 +631,12 @@ public class ElasticsearchDao implements IndexDao {
     }
   }
 
-  private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) {
+  private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) {
     List<Group> groups = groupRequest.getGroups();
     Group group = groups.get(index);
     String aggregationName = getGroupByAggregationName(group.getField());
-    TermsBuilder termsBuilder = new TermsBuilder(aggregationName)
+    TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggregationName);
+    termsBuilder
         .field(group.getField())
         .size(accessConfig.getMaxSearchGroups())
         .order(getElasticsearchGroupOrder(group.getOrder()));
@@ -602,7 +645,8 @@ public class ElasticsearchDao implements IndexDao {
     }
     Optional<String> scoreField = groupRequest.getScoreField();
     if (scoreField.isPresent()) {
-      termsBuilder.subAggregation(new SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0));
+      SumAggregationBuilder scoreSumAggregationBuilder = AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0);
+      termsBuilder.subAggregation(scoreSumAggregationBuilder);
     }
     return termsBuilder;
   }
@@ -630,14 +674,15 @@ public class ElasticsearchDao implements IndexDao {
     return searchResultGroups;
   }
 
-  private SearchResult getSearchResult(SearchHit searchHit, boolean fieldsPresent) {
+  private SearchResult getSearchResult(SearchHit searchHit, Optional<List<String>> fields) {
     SearchResult searchResult = new SearchResult();
     searchResult.setId(searchHit.getId());
     Map<String, Object> source;
-    if (fieldsPresent) {
+    if (fields.isPresent()) {
+      Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap();
       source = new HashMap<>();
-      searchHit.getFields().forEach((key, value) -> {
-        source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues());
+      fields.get().forEach(field -> {
+        source.put(field, resultSourceAsMap.get(field));
       });
     } else {
       source = searchHit.getSource();
@@ -648,7 +693,7 @@ public class ElasticsearchDao implements IndexDao {
     return searchResult;
   }
 
-  private String getFacentAggregationName(String field) {
+  private String getFacetAggregationName(String field) {
     return String.format("%s_count", field);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index f8fb145..9740272 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -27,19 +27,11 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.commons.collections4.SetUtils;
+import org.apache.lucene.search.join.ScoreMode;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -59,6 +51,22 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetItemResponse;
+import org.elasticsearch.action.get.MultiGetRequest.Item;
+import org.elasticsearch.action.get.MultiGetRequestBuilder;
+import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -66,7 +74,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.index.query.support.QueryInnerHitBuilder;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
@@ -163,8 +170,9 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + GUID, guid))
-            ).innerHit(new QueryInnerHitBuilder())
+                    .must(termQuery(ALERT_FIELD + "." + GUID, guid)),
+                    ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
     return queryAllResults(qb);
@@ -379,7 +387,8 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
             .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
             .should(nestedQuery(
                 ALERT_FIELD,
-                new QueryStringQueryBuilder(searchRequest.getQuery())
+                new QueryStringQueryBuilder(searchRequest.getQuery()),
+                ScoreMode.None
                 )
             )
         )
@@ -486,8 +495,9 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid))
-            ).innerHit(new QueryInnerHitBuilder())
+                    .must(termQuery(ALERT_FIELD + "." + Constants.GUID, alertGuid)),
+                ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
     return queryAllResults(qb);
@@ -504,7 +514,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     SearchRequestBuilder searchRequestBuilder = elasticsearchDao
         .getClient()
         .prepareSearch(index)
-        .addFields("*")
+        .addStoredField("*")
         .setFetchSource(true)
         .setQuery(qb)
         .setSize(pageSize);
@@ -586,6 +596,54 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     } // else we have no updates, so don't do anything
   }
 
+
+
+  @SuppressWarnings("unchecked")
+  protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
+    Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
+    if (latest == null) {
+      return new ArrayList<>();
+    }
+    List<String> guids = new ArrayList<>();
+    List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
+        .get(MetaAlertDao.ALERT_FIELD);
+    for (Map<String, Object> alert : latestAlerts) {
+      guids.add((String) alert.get(Constants.GUID));
+    }
+
+    List<Map<String, Object>> alerts = new ArrayList<>();
+    QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new String[0]));
+    SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
+        .setQuery(query);
+    org.elasticsearch.action.search.SearchResponse response = request.get();
+    for (SearchHit hit : response.getHits().getHits()) {
+      alerts.add(hit.sourceAsMap());
+    }
+    return alerts;
+  }
+
+  /**
+   * Builds an update Document for updating the meta alerts list.
+   * @param alertGuid The GUID of the alert to update
+   * @param sensorType The sensor type to update
+   * @param metaAlertField The new metaAlertList to use
+   * @return The update Document
+   */
+  protected Document buildAlertUpdate(String alertGuid, String sensorType,
+      List<String> metaAlertField, Long timestamp) {
+    Document alertUpdate;
+    Map<String, Object> document = new HashMap<>();
+    document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    alertUpdate = new Document(
+        document,
+        alertGuid,
+        sensorType,
+        timestamp
+    );
+    return alertUpdate;
+  }
+
+
   @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index f29012a..4b73b84 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,19 +17,11 @@
  */
 package org.apache.metron.elasticsearch.utils;
 
+import static java.lang.String.format;
+
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.xcontent.XContentHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -41,8 +33,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
-import static java.lang.String.format;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.netty.utils.NettyRuntimeWrapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ElasticsearchUtils {
 
@@ -106,7 +108,7 @@ public class ElasticsearchUtils {
   }
 
   public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) {
-    Settings.Builder settingsBuilder = Settings.settingsBuilder();
+    Settings.Builder settingsBuilder = Settings.builder();
     settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
     settingsBuilder.put("client.transport.ping_timeout","500s");
     if (optionalSettings != null) {
@@ -115,7 +117,13 @@ public class ElasticsearchUtils {
     Settings settings = settingsBuilder.build();
     TransportClient client;
     try{
-      client = TransportClient.builder().settings(settings).build();
+      LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors());
+      // Netty sets available processors statically and if an attempt is made to set it more than
+      // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
+      // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
+      // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
+      System.setProperty("es.set.netty.runtime.available.processors", "false");
+      client = new PreBuiltTransportClient(settings);
       for(HostnamePort hp : getIps(globalConfiguration)) {
         client.addTransportAddress(
                 new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
@@ -196,9 +204,10 @@ public class ElasticsearchUtils {
   public static Optional<String> toJSON(org.elasticsearch.action.search.SearchRequest esRequest) {
     Optional<String> json = Optional.empty();
 
-    if(esRequest != null) {
+    if(esRequest != null && esRequest.source() != null) {
       try {
-        json = Optional.of(XContentHelper.convertToJson(esRequest.source(), true));
+        BytesReference requestBytes = esRequest.source().buildAsBytes();
+        json = Optional.of(XContentHelper.convertToJson(requestBytes, true));
 
       } catch (Throwable t) {
         LOG.error("Failed to convert search request to JSON", t);

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index bc9eccc..143bcf7 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,12 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.FieldNameConverter;
@@ -34,13 +40,6 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
   private Map<String, String> optionalSettings;

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
new file mode 100644
index 0000000..ddec27c
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.bulk;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchImportExportTest {
+
+
+  /**
+   *{"_index":".kibana","_type":"visualization","_id":"AV-Sj0e2hKs1cXXnFMqF","_score":1,"_source":{"title":"Welcome to Apache Metron"}}
+   *{"_index":".kibana","_type":"blah","_id":"MIKE-AV-Sj0e2hKs1cXXnFMqF","_score":1,"_source":{"title":"another Welcome to Apache Metron"}}
+   */
+  @Multiline
+  private static String records;
+
+  /**
+   *{ "create" : { "_id": "AV-Sj0e2hKs1cXXnFMqF", "_type": "visualization" } }
+   *{"title":"Welcome to Apache Metron"}
+   *{ "create" : { "_id": "MIKE-AV-Sj0e2hKs1cXXnFMqF", "_type": "blah" } }
+   *{"title":"another Welcome to Apache Metron"}
+   */
+  @Multiline
+  private static String expected;
+  private File tempDir;
+
+  @Before
+  public void setup() throws Exception {
+    tempDir = TestUtils.createTempDir(this.getClass().getName());
+  }
+
+  @Test
+  public void bulk_exporter_writes_elasticsearch_records_in_bulk_import_format() throws Exception {
+    Path recordsFile = Paths.get(tempDir.getPath(), "inputfile.json");
+    Path outputFile = Paths.get(tempDir.getPath(), "outputfile.json");
+    TestUtils.write(recordsFile.toFile(), records);
+
+    ElasticsearchImportExport tool = new ElasticsearchImportExport();
+    tool.bulkify(recordsFile, outputFile);
+    String actual = TestUtils.read(outputFile.toFile());
+    assertThat(actual, equalTo(expected));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index a6c0aa6..2a6fb4f 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -17,14 +17,23 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -35,18 +44,10 @@ import org.json.simple.parser.JSONParser;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class ElasticsearchDaoTest {
 
@@ -103,7 +104,7 @@ public class ElasticsearchDaoTest {
 
     // setup the column metadata
     Map<String, FieldType> columnMetadata = new HashMap<>();
-    columnMetadata.put("sortByStringDesc", FieldType.STRING);
+    columnMetadata.put("sortByStringDesc", FieldType.TEXT);
     columnMetadata.put("sortByIntAsc", FieldType.INTEGER);
 
     // setup the dao
@@ -148,7 +149,7 @@ public class ElasticsearchDaoTest {
       JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc");
       assertEquals("desc", sortBy.get("order"));
       assertEquals("_last", sortBy.get("missing"));
-      assertEquals("string", sortBy.get("unmapped_type"));
+      assertEquals("text", sortBy.get("unmapped_type"));
     }
     {
       // sort by integer ascending
@@ -217,7 +218,7 @@ public class ElasticsearchDaoTest {
 
     SearchRequest searchRequest = new SearchRequest();
     searchRequest.setSize(maxSearchResults+1);
-
+    searchRequest.setQuery("");
     dao.search(searchRequest);
     // exception expected - size > max
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 26f5fff..07019c3 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
@@ -56,7 +57,7 @@ public class ElasticsearchRequestSubmitterTest {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
 
     // response will have status of OK and no failed shards
     when(response.status()).thenReturn(RestStatus.OK);
@@ -74,7 +75,7 @@ public class ElasticsearchRequestSubmitterTest {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
 
     // response will have status of OK
     when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT);
@@ -90,9 +91,9 @@ public class ElasticsearchRequestSubmitterTest {
   public void searchShouldHandleShardFailure() throws InvalidSearchException {
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
     ShardSearchFailure fail = mock(ShardSearchFailure.class);
-    SearchShardTarget target = mock(SearchShardTarget.class);
+    SearchShardTarget target = new SearchShardTarget("node1", mock(Index.class), 1, "metron");
 
     // response will have status of OK
     when(response.status()).thenReturn(RestStatus.OK);
@@ -107,7 +108,6 @@ public class ElasticsearchRequestSubmitterTest {
 
     // shard failure needs to report the node
     when(fail.shard()).thenReturn(target);
-    when(target.getNodeId()).thenReturn("node1");
 
     // shard failure needs to report details of failure
     when(fail.index()).thenReturn("bro_index_2017-10-11");


Mime
View raw message