flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [1/2] FLUME-2225. Elasticsearch Sink for ES HTTP API
Date Thu, 20 Mar 2014 23:17:42 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.5 bf89b590e -> 042f53a5f


http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
new file mode 100644
index 0000000..bef2ac6
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TimestampedEventTest {
+  static final long FIXED_TIME_MILLIS = 123456789L;
+
+  @Before
+  public void setFixedJodaTime() {
+    DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+  }
+
+  @Test
+  public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+    assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+            timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("timestamp", "-321");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-321L, timestampedEvent.getTimestamp());
+    assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("@timestamp", "-999");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-999L, timestampedEvent.getTimestamp());
+    assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+    assertNull(timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    base.setBody(new byte[] {1,2,3,4});
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("foo", "bar");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+    assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
new file mode 100644
index 0000000..38e7399
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RoundRobinListTest {
+
+  private RoundRobinList<String> fixture;
+
+  @Before
+  public void setUp() {
+    fixture = new RoundRobinList<String>(Arrays.asList("test1", "test2"));
+  }
+
+  @Test
+  public void shouldReturnNextElement() {
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
new file mode 100644
index 0000000..4b70b65
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import org.mockito.Mock;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchClientFactory {
+
+  ElasticSearchClientFactory factory;
+  
+  @Mock
+  ElasticSearchEventSerializer serializer;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    factory = new ElasticSearchClientFactory();
+  }
+
+  @Test
+  public void shouldReturnTransportClient() throws Exception {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.TransportClient,
+            hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchTransportClient.class));
+  }
+
+  @Test
+  public void shouldReturnRestClient() throws NoSuchClientTypeException {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.RestClient,
+            hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchRestClient.class));
+  }
+
+  @Test(expected=NoSuchClientTypeException.class)
+  public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException {
+    String[] hostNames = {"127.0.0.1"};
+    factory.getClient("not_existing_client", hostNames, "test", null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
new file mode 100644
index 0000000..b7d8822
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesArray;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchRestClient {
+
+  private ElasticSearchRestClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+  
+  @Mock
+  private Event event;
+
+  @Mock
+  private HttpClient httpClient;
+
+  @Mock
+  private HttpResponse httpResponse;
+
+  @Mock
+  private StatusLine httpStatus;
+
+  @Mock
+  private HttpEntity httpEntity;
+
+  private static final String INDEX_NAME = "foo_index";
+  private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}";
+  private static final String[] HOSTS = {"host1", "host2"};
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME);
+    when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT));
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream);
+    fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+    
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\"}}\n" + MESSAGE_CONTENT
+ "\n",
+            EntityUtils.toString(argument.getValue().getEntity()));
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n"
+
+            MESSAGE_CONTENT + "\n", EntityUtils.toString(argument.getValue().getEntity()));
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowEventDeliveryException() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+  }
+
+  @Test()
+  public void shouldRetryBulkOperation() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient, times(2)).execute(isA(HttpUriRequest.class));
+    verify(httpClient, times(2)).execute(argument.capture());
+
+    List<HttpPost> allValues = argument.getAllValues();
+    assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString());
+    assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
new file mode 100644
index 0000000..b7b8e74
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchTransportClient {
+
+  private ElasticSearchTransportClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+
+  @Mock
+  private Client elasticSearchClient;
+
+  @Mock
+  private BulkRequestBuilder bulkRequestBuilder;
+
+  @Mock
+  private IndexRequestBuilder indexRequestBuilder;
+
+  @Mock
+  private Event event;
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index");
+    when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes());
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class)))
+        .thenReturn(bytesStream);
+    when(elasticSearchClient.prepareIndex(anyString(), anyString()))
+        .thenReturn(indexRequestBuilder);
+    when(indexRequestBuilder.setSource(bytesReference)).thenReturn(
+        indexRequestBuilder);
+
+    fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer);
+    fixture.setBulkRequestBuilder(bulkRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+    verify(bulkRequestBuilder).add(indexRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    verify(indexRequestBuilder).setTTL(10);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+  }
+
+  @Test
+  public void shouldExecuteBulkRequestBuilder() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(false);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+    verify(bulkRequestBuilder).execute();
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowExceptionOnExecuteFailed() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(true);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+  }
+}


Mime
View raw message