flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1690: Elastic Search Sink doesn't run it's unit tests
Date Fri, 09 Nov 2012 19:27:28 GMT
Updated Branches:
  refs/heads/flume-1.4 0d7ab0a7d -> 84e472141


FLUME-1690: Elastic Search Sink doesn't run it's unit tests

(Hari Shreedharan via Brock Noland


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/84e47214
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/84e47214
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/84e47214

Branch: refs/heads/flume-1.4
Commit: 84e472141b6f9375afe712f0d7259d20983dd93a
Parents: 0d7ab0a
Author: Brock Noland <brock@apache.org>
Authored: Fri Nov 9 13:24:17 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Nov 9 13:26:43 2012 -0600

----------------------------------------------------------------------
 .../ElasticSearchDynamicSerializerTest.java        |   64 ----
 .../ElasticSearchLogStashEventSerializerTest.java  |  122 --------
 .../sink/elasticsearch/ElasticSearchSinkTest.java  |  223 ---------------
 .../TestElasticSearchDynamicSerializer.java        |   64 ++++
 .../TestElasticSearchLogStashEventSerializer.java  |  122 ++++++++
 .../sink/elasticsearch/TestElasticSearchSink.java  |  223 +++++++++++++++
 6 files changed, 409 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
deleted file mode 100644
index 3317734..0000000
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.elasticsearch;
-
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Map;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.elasticsearch.common.collect.Maps;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.Test;
-
-public class ElasticSearchDynamicSerializerTest {
-
-  @Test
-  public void testRoundTrip() throws Exception {
-    ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer();
-    Context context = new Context();
-    fixture.configure(context);
-
-    String message = "test body";
-    Map<String, String> headers = Maps.newHashMap();
-    headers.put("headerNameOne", "headerValueOne");
-    headers.put("headerNameTwo", "headerValueTwo");
-    headers.put("headerNameThree", "headerValueThree");
-    Event event = EventBuilder.withBody(message.getBytes(charset));
-    event.setHeaders(headers);
-
-    XContentBuilder expected = jsonBuilder().startObject();
-    expected.field("body", new String(message.getBytes(), charset));
-    for (String headerName : headers.keySet()) {
-      expected.field(headerName, new String(headers.get(headerName).getBytes(),
-          charset));
-    }
-    expected.endObject();
-
-    XContentBuilder actual = fixture.getContentBuilder(event);
-
-    assertEquals(new String(expected.bytes().array()), new String(actual
-        .bytes().array()));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
deleted file mode 100644
index a974e8b..0000000
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.elasticsearch;
-
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.elasticsearch.common.collect.Maps;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.Test;
-
-public class ElasticSearchLogStashEventSerializerTest {
-
-  @Test
-  public void testRoundTrip() throws Exception {
-    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
-    Context context = new Context();
-    fixture.configure(context);
-
-    String message = "test body";
-    Map<String, String> headers = Maps.newHashMap();
-    long timestamp = System.currentTimeMillis();
-    headers.put("timestamp", String.valueOf(timestamp));
-    headers.put("source", "flume_tail_src");
-    headers.put("host", "test@localhost");
-    headers.put("src_path", "/tmp/test");
-    headers.put("headerNameOne", "headerValueOne");
-    headers.put("headerNameTwo", "headerValueTwo");
-    headers.put("type", "sometype");
-    Event event = EventBuilder.withBody(message.getBytes(charset));
-    event.setHeaders(headers);
-
-    XContentBuilder expected = jsonBuilder().startObject();
-    expected.field("@message", new String(message.getBytes(), charset));
-    expected.field("@timestamp", new Date(timestamp));
-    expected.field("@source", "flume_tail_src");
-    expected.field("@type", "sometype");
-    expected.field("@source_host", "test@localhost");
-    expected.field("@source_path", "/tmp/test");
-    expected.startObject("@fields");
-    expected.field("timestamp", String.valueOf(timestamp));
-    expected.field("src_path", "/tmp/test");
-    expected.field("host", "test@localhost");
-    expected.field("headerNameTwo", "headerValueTwo");
-    expected.field("source", "flume_tail_src");
-    expected.field("headerNameOne", "headerValueOne");
-    expected.field("type", "sometype");
-    expected.endObject();
-
-    expected.endObject();
-
-    XContentBuilder actual = fixture.getContentBuilder(event);
-    assertEquals(new String(expected.bytes().array()), new String(actual
-        .bytes().array()));
-  }
-
-  @Test
-  public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception {
-    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
-    Context context = new Context();
-    fixture.configure(context);
-
-    String message = "{flume: somethingnotvalid}";
-    Map<String, String> headers = Maps.newHashMap();
-    long timestamp = System.currentTimeMillis();
-    headers.put("timestamp", String.valueOf(timestamp));
-    headers.put("source", "flume_tail_src");
-    headers.put("host", "test@localhost");
-    headers.put("src_path", "/tmp/test");
-    headers.put("headerNameOne", "headerValueOne");
-    headers.put("headerNameTwo", "headerValueTwo");
-    headers.put("type", "sometype");
-    Event event = EventBuilder.withBody(message.getBytes(charset));
-    event.setHeaders(headers);
-
-    XContentBuilder expected = jsonBuilder().startObject();
-    expected.field("@message", new String(message.getBytes(), charset));
-    expected.field("@timestamp", new Date(timestamp));
-    expected.field("@source", "flume_tail_src");
-    expected.field("@type", "sometype");
-    expected.field("@source_host", "test@localhost");
-    expected.field("@source_path", "/tmp/test");
-    expected.startObject("@fields");
-    expected.field("timestamp", String.valueOf(timestamp));
-    expected.field("src_path", "/tmp/test");
-    expected.field("host", "test@localhost");
-    expected.field("headerNameTwo", "headerValueTwo");
-    expected.field("source", "flume_tail_src");
-    expected.field("headerNameOne", "headerValueOne");
-    expected.field("type", "sometype");
-    expected.endObject();
-
-    expected.endObject();
-
-    XContentBuilder actual = fixture.getContentBuilder(event);
-    assertEquals(new String(expected.bytes().array()), new String(actual
-        .bytes().array()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
deleted file mode 100644
index bb2f9f4..0000000
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.elasticsearch;
-
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Sink.Status;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ElasticSearchSinkTest extends AbstractElasticSearchSinkTest {
-
-  private ElasticSearchSink fixture;
-
-  @Before
-  public void init() throws Exception {
-    initDefaults();
-    createNodes();
-    fixture = new ElasticSearchSink(true);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    shutdownNodes();
-  }
-
-  @Test
-  public void shouldIndexOneEvent() throws Exception {
-    Configurables.configure(fixture, new Context(parameters));
-    Channel channel = bindAndStartChannel(fixture);
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    Event event = EventBuilder.withBody("event #1 or 1".getBytes());
-    channel.put(event);
-    tx.commit();
-    tx.close();
-
-    fixture.process();
-    fixture.stop();
-    client.admin().indices()
-        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
-
-    assertMatchAllQuery(1, event);
-    assertBodyQuery(1, event);
-  }
-
-  @Test
-  public void shouldIndexFiveEvents() throws Exception {
-    // Make it so we only need to call process once
-    parameters.put(BATCH_SIZE, "5");
-    Configurables.configure(fixture, new Context(parameters));
-    Channel channel = bindAndStartChannel(fixture);
-
-    int numberOfEvents = 5;
-    Event[] events = new Event[numberOfEvents];
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (int i = 0; i < numberOfEvents; i++) {
-      String body = "event #" + i + " of " + numberOfEvents;
-      Event event = EventBuilder.withBody(body.getBytes());
-      events[i] = event;
-      channel.put(event);
-    }
-    tx.commit();
-    tx.close();
-
-    fixture.process();
-    fixture.stop();
-    client.admin().indices()
-        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
-
-    assertMatchAllQuery(numberOfEvents, events);
-    assertBodyQuery(5, events);
-  }
-
-  @Test
-  public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
-    parameters.put(BATCH_SIZE, "2");
-    Configurables.configure(fixture, new Context(parameters));
-    Channel channel = bindAndStartChannel(fixture);
-
-    int numberOfEvents = 5;
-    Event[] events = new Event[numberOfEvents];
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (int i = 0; i < numberOfEvents; i++) {
-      String body = "event #" + i + " of " + numberOfEvents;
-      Event event = EventBuilder.withBody(body.getBytes());
-      events[i] = event;
-      channel.put(event);
-    }
-    tx.commit();
-    tx.close();
-
-    int count = 0;
-    Status status = Status.READY;
-    while (status != Status.BACKOFF) {
-      count++;
-      status = fixture.process();
-    }
-    fixture.stop();
-
-    assertEquals(3, count);
-
-    client.admin().indices()
-        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
-    assertMatchAllQuery(numberOfEvents, events);
-    assertBodyQuery(5, events);
-  }
-
-  @Test
-  public void shouldParseConfiguration() {
-    parameters.put(HOSTNAMES, "10.5.5.27");
-    parameters.put(CLUSTER_NAME, "testing-cluster-name");
-    parameters.put(INDEX_NAME, "testing-index-name");
-    parameters.put(INDEX_TYPE, "testing-index-type");
-    parameters.put(TTL, "10");
-
-    fixture = new ElasticSearchSink();
-    fixture.configure(new Context(parameters));
-
-    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
-        "10.5.5.27", DEFAULT_PORT) };
-
-    assertEquals("testing-cluster-name", fixture.getClusterName());
-    assertEquals(
-        "testing-index-name-" + ElasticSearchSink.df.format(new Date()),
-        fixture.getIndexName());
-    assertEquals("testing-index-type", fixture.getIndexType());
-    assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs());
-    assertArrayEquals(expected, fixture.getServerAddresses());
-  }
-
-  @Test
-  public void shouldParseConfigurationUsingDefaults() {
-    parameters.put(HOSTNAMES, "10.5.5.27");
-    parameters.remove(INDEX_NAME);
-    parameters.remove(INDEX_TYPE);
-    parameters.remove(CLUSTER_NAME);
-
-    fixture = new ElasticSearchSink();
-    fixture.configure(new Context(parameters));
-
-    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
-        "10.5.5.27", DEFAULT_PORT) };
-
-    assertEquals(
-        DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()),
-        fixture.getIndexName());
-    assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
-    assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName());
-    assertArrayEquals(expected, fixture.getServerAddresses());
-  }
-
-  @Test
-  public void shouldParseMultipleHostUsingDefaultPorts() {
-    parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29");
-
-    fixture = new ElasticSearchSink();
-    fixture.configure(new Context(parameters));
-
-    InetSocketTransportAddress[] expected = {
-        new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
-        new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
-        new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
-
-    assertArrayEquals(expected, fixture.getServerAddresses());
-  }
-
-  @Test
-  public void shouldParseMultipleHostAndPorts() {
-    parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302");
-
-    fixture = new ElasticSearchSink();
-    fixture.configure(new Context(parameters));
-
-    InetSocketTransportAddress[] expected = {
-        new InetSocketTransportAddress("10.5.5.27", 9300),
-        new InetSocketTransportAddress("10.5.5.28", 9301),
-        new InetSocketTransportAddress("10.5.5.29", 9302) };
-
-    assertArrayEquals(expected, fixture.getServerAddresses());
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
new file mode 100644
index 0000000..43a4b12
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+public class TestElasticSearchDynamicSerializer {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("headerNameThree", "headerValueThree");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("body", new String(message.getBytes(), charset));
+    for (String headerName : headers.keySet()) {
+      expected.field(headerName, new String(headers.get(headerName).getBytes(),
+          charset));
+    }
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
new file mode 100644
index 0000000..9dff4b0
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+public class TestElasticSearchLogStashEventSerializer {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+  }
+
+  @Test
+  public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "{flume: somethingnotvalid}";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
new file mode 100644
index 0000000..4faa5be
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
+
+  private ElasticSearchSink fixture;
+
+  @Before
+  public void init() throws Exception {
+    initDefaults();
+    createNodes();
+    fixture = new ElasticSearchSink(true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    shutdownNodes();
+  }
+
+  @Test
+  public void shouldIndexOneEvent() throws Exception {
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody("event #1 or 1".getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(1, event);
+    assertBodyQuery(1, event);
+  }
+
+  @Test
+  public void shouldIndexFiveEvents() throws Exception {
+    // Make it so we only need to call process once
+    parameters.put(BATCH_SIZE, "5");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
+    parameters.put(BATCH_SIZE, "2");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    int count = 0;
+    Status status = Status.READY;
+    while (status != Status.BACKOFF) {
+      count++;
+      status = fixture.process();
+    }
+    fixture.stop();
+
+    assertEquals(3, count);
+
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldParseConfiguration() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+    parameters.put(TTL, "10");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT) };
+
+    assertEquals("testing-cluster-name", fixture.getClusterName());
+    assertEquals(
+        "testing-index-name-" + ElasticSearchSink.df.format(new Date()),
+        fixture.getIndexName());
+    assertEquals("testing-index-type", fixture.getIndexType());
+    assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseConfigurationUsingDefaults() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.remove(INDEX_NAME);
+    parameters.remove(INDEX_TYPE);
+    parameters.remove(CLUSTER_NAME);
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT) };
+
+    assertEquals(
+        DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()),
+        fixture.getIndexName());
+    assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
+    assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostUsingDefaultPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = {
+        new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
+        new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
+        new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostAndPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = {
+        new InetSocketTransportAddress("10.5.5.27", 9300),
+        new InetSocketTransportAddress("10.5.5.28", 9301),
+        new InetSocketTransportAddress("10.5.5.29", 9302) };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+}


Mime
View raw message