flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1371: ElasticSearch Sink
Date Fri, 26 Oct 2012 17:51:10 GMT
Updated Branches:
  refs/heads/flume-1.4 30d9ca313 -> 2aabe6e39


FLUME-1371: ElasticSearch Sink

(Cameron Gandevia via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2aabe6e3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2aabe6e3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2aabe6e3

Branch: refs/heads/flume-1.4
Commit: 2aabe6e39005c5da2290ec3ca40ce365df88fe1c
Parents: 30d9ca3
Author: Brock Noland <brock@apache.org>
Authored: Fri Oct 26 12:45:42 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Fri Oct 26 12:46:03 2012 -0500

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                              |    4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   44 ++
 flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml |   73 +++
 .../sink/elasticsearch/ContentBuilderUtil.java     |   76 +++
 .../ElasticSearchDynamicSerializer.java            |   73 +++
 .../ElasticSearchEventSerializer.java              |   48 ++
 .../ElasticSearchLogStashEventSerializer.java      |  145 ++++++
 .../sink/elasticsearch/ElasticSearchSink.java      |  385 +++++++++++++++
 .../elasticsearch/ElasticSearchSinkConstants.java  |   81 +++
 .../AbstractElasticSearchSinkTest.java             |  133 +++++
 .../ElasticSearchDynamicSerializerTest.java        |   64 +++
 .../ElasticSearchLogStashEventSerializerTest.java  |  122 +++++
 .../sink/elasticsearch/ElasticSearchSinkTest.java  |  223 +++++++++
 flume-ng-sinks/pom.xml                             |    1 +
 pom.xml                                            |   21 +
 15 files changed, 1493 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index a5512b0..8751f15 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -105,6 +105,10 @@
       <artifactId>flume-ng-hbase-sink</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-ng-elasticsearch-sink</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-sources</groupId>
       <artifactId>flume-scribe-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 29ead84..d24868d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1338,6 +1338,49 @@ Example for agent named **agent_foo**:
   agent_foo.sinks.hbaseSink-1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
   agent_foo.sinks.hbaseSink-1.channel = memoryChannel-1
 
+ElasticSearchSink
+'''''''''''''''''
+
+This sink writes data to ElasticSearch. A class implementing
+ElasticSearchEventSerializer which is specified by the configuration is used to convert the events into
+XContentBuilder which detail the fields and mappings which will be indexed. These are then then written
+to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with 
+a single large index
+The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
+Required properties are in **bold**.
+
+================  ==================================================================  =======================================================================================================
+Property Name     Default                                                             Description
+================  ==================================================================  =======================================================================================================
+**channel**       --
+**type**          --                                                                  The component type name, needs to be ``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
+**hostNames**     --								      Comma separated list of hostname:port, if the port is not present the default port '9300' will be used
+indexName         flume                                                               The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd'
+indexType	  logs                                                                The type to index the document to, defaults to 'log'
+clusterName       elasticsearch							      Name of the ElasticSearch cluster to connect to
+batchSize         100                                                                 Number of events to be written per txn.
+ttl               --                                                                  TTL in days, when set will cause the expired documents to be deleted automatically, 
+                                                                                      if not set documents will never be automatically deleted
+serializer        org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
+serializer.*      --                                                                  Properties to be passed to the serializer.
+================  ==================================================================  =======================================================================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.channels = memoryChannel-1
+  agent_foo.sinks = esSink-1
+  agent_foo.sinks.esSink-1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
+  agent_foo.sinks.esSink-1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
+  agent_foo.sinks.esSink-1.indexName = foo_index
+  agent_foo.sinks.esSink-1.indexType = bar_type
+  agent_foo.sinks.esSink-1.clusterName = foobar_cluster
+  agent_foo.sinks.esSink-1.batchSize = 500
+  agent_foo.sinks.esSink-1.ttl = 5
+  agent_foo.sinks.esSink-1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
+  agent_foo.sinks.esSink-1.channel = memoryChannel-1
+
 Custom Sink
 ~~~~~~~~~~~
 
@@ -2197,6 +2240,7 @@ org.apache.flume.Sink                     AVRO                org.apache.flume.s
 org.apache.flume.Sink                     HDFS                org.apache.flume.sink.hdfs.HDFSEventSink
 org.apache.flume.Sink                     --                  org.apache.flume.sink.hbase.HBaseSink
 org.apache.flume.Sink                     --                  org.apache.flume.sink.hbase.AsyncHBaseSink
+org.apache.flume.Sink                     --                  org.apache.flume.sink.elasticsearch.ElasticSearchSink
 org.apache.flume.Sink                     FILE_ROLL           org.apache.flume.sink.RollingFileSink
 org.apache.flume.Sink                     IRC                 org.apache.flume.sink.irc.IRCSink
 org.apache.flume.Sink                     --                  org.example.MySink

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
new file mode 100644
index 0000000..ab909b2
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.4.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-ng-elasticsearch-sink</artifactId>
+  <name>Flume NG ElasticSearch Sink</name>
+
+    <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
new file mode 100644
index 0000000..bf7c57c
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.elasticsearch.common.jackson.core.JsonParseException;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+
+/**
+ * Utility methods for using ElasticSearch {@link XContentBuilder}
+ */
+public class ContentBuilderUtil {
+
+  private static final Charset charset = Charset.defaultCharset();
+
+  private ContentBuilderUtil() {
+  }
+
+  public static void appendField(XContentBuilder builder, String field,
+      byte[] data) throws IOException {
+    XContentType contentType = XContentFactory.xContentType(data);
+    if (contentType == null) {
+      addSimpleField(builder, field, data);
+    } else {
+      addComplexField(builder, field, contentType, data);
+    }
+  }
+
+  public static void addSimpleField(XContentBuilder builder, String fieldName,
+      byte[] data) throws IOException {
+    builder.field(fieldName, new String(data, charset));
+  }
+
+  public static void addComplexField(XContentBuilder builder, String fieldName,
+      XContentType contentType, byte[] data) throws IOException {
+    XContentParser parser = null;
+    try {
+      XContentBuilder tmp = jsonBuilder();
+      parser = XContentFactory.xContent(contentType).createParser(data);
+      parser.nextToken();
+      tmp.copyCurrentStructure(parser);
+      builder.field(fieldName, tmp);
+    } catch (JsonParseException ex) {
+      // If we get an exception here the most likely cause is nested JSON that
+      // can't be figured out in the body. At this point just push it through
+      // as is, we have already added the field so don't do it again
+      addSimpleField(builder, fieldName, data);
+    } finally {
+      if (parser != null) {
+        parser.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java
new file mode 100644
index 0000000..aa7ad39
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+/**
+ * Basic serializer that serializes the event body and header fields into
+ * individual fields</p>
+ *
+ * A best effort will be used to determine the content-type, if it cannot be
+ * determined fields will be indexed as Strings
+ */
+public class ElasticSearchDynamicSerializer implements
+    ElasticSearchEventSerializer {
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+
+  @Override
+  public XContentBuilder getContentBuilder(Event event) throws IOException {
+    XContentBuilder builder = jsonBuilder().startObject();
+    appendBody(builder, event);
+    appendHeaders(builder, event);
+    return builder;
+  }
+
+  private void appendBody(XContentBuilder builder, Event event)
+      throws IOException {
+    ContentBuilderUtil.appendField(builder, "body", event.getBody());
+  }
+
+  private void appendHeaders(XContentBuilder builder, Event event)
+      throws IOException {
+    Map<String, String> headers = event.getHeaders();
+    for (String key : headers.keySet()) {
+      ContentBuilderUtil.appendField(builder, key,
+          headers.get(key).getBytes(charset));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
new file mode 100644
index 0000000..dc6a093
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+/**
+ * Interface for an event serializer which serializes the headers and body of an
+ * event to write them to ElasticSearch. This is configurable, so any config
+ * params required should be taken through this.
+ */
+public interface ElasticSearchEventSerializer extends Configurable,
+    ConfigurableComponent {
+
+  public static final Charset charset = Charset.defaultCharset();
+
+  /**
+   * Return an {@link XContentBuilder} made up of the serialized flume event
+   * @param event
+   *          The flume event to serialize
+   * @return A {@link XContentBuilder} used to write to ElasticSearch
+   * @throws IOException
+   *           If an error occurs during serialization
+   */
+  abstract XContentBuilder getContentBuilder(Event event) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
new file mode 100644
index 0000000..3638368
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+/**
+ * Serialize flume events into the same format LogStash uses</p>
+ *
+ * This can be used to send events to ElasticSearch and use clients such as
+ * Kabana which expect Logstash formated indexes
+ *
+ * <pre>
+ * {
+ *    "@timestamp": "2010-12-21T21:48:33.309258Z",
+ *    "@tags": [ "array", "of", "tags" ],
+ *    "@type": "string",
+ *    "@source": "source of the event, usually a URL."
+ *    "@source_host": ""
+ *    "@source_path": ""
+ *    "@fields":{
+ *       # a set of fields for this event
+ *       "user": "jordan",
+ *       "command": "shutdown -r":
+ *     }
+ *     "@message": "the original plain-text message"
+ *   }
+ * </pre>
+ *
+ * If the following headers are present, they will map to the above logstash
+ * output as long as the logstash fields are not already present.</p>
+ *
+ * <pre>
+ *  timestamp: long -> @timestamp:Date
+ *  host: String -> @source_host: String
+ *  src_path: String -> @source_path: String
+ *  type: String -> @type: String
+ *  source: String -> @source: String
+ * </pre>
+ *
+ * @see https
+ *      ://github.com/logstash/logstash/wiki/logstash%27s-internal-message-
+ *      format
+ */
+public class ElasticSearchLogStashEventSerializer implements
+    ElasticSearchEventSerializer {
+
+  @Override
+  public XContentBuilder getContentBuilder(Event event) throws IOException {
+    XContentBuilder builder = jsonBuilder().startObject();
+    appendBody(builder, event);
+    appendHeaders(builder, event);
+    return builder;
+  }
+
+  private void appendBody(XContentBuilder builder, Event event)
+      throws IOException, UnsupportedEncodingException {
+    byte[] body = event.getBody();
+    ContentBuilderUtil.appendField(builder, "@message", body);
+  }
+
+  private void appendHeaders(XContentBuilder builder, Event event)
+      throws IOException {
+    Map<String, String> headers = Maps.newHashMap(event.getHeaders());
+
+    String timestamp = headers.get("timestamp");
+    if (!StringUtils.isBlank(timestamp)
+        && StringUtils.isBlank(headers.get("@timestamp"))) {
+      long timestampMs = Long.parseLong(timestamp);
+      builder.field("@timestamp", new Date(timestampMs));
+    }
+
+    String source = headers.get("source");
+    if (!StringUtils.isBlank(source)
+        && StringUtils.isBlank(headers.get("@source"))) {
+      ContentBuilderUtil.appendField(builder, "@source",
+          source.getBytes(charset));
+    }
+
+    String type = headers.get("type");
+    if (!StringUtils.isBlank(type)
+        && StringUtils.isBlank(headers.get("@type"))) {
+      ContentBuilderUtil.appendField(builder, "@type", type.getBytes(charset));
+    }
+
+    String host = headers.get("host");
+    if (!StringUtils.isBlank(host)
+        && StringUtils.isBlank(headers.get("@source_host"))) {
+      ContentBuilderUtil.appendField(builder, "@source_host",
+          host.getBytes(charset));
+    }
+
+    String srcPath = headers.get("src_path");
+    if (!StringUtils.isBlank(srcPath)
+        && StringUtils.isBlank(headers.get("@source_path"))) {
+      ContentBuilderUtil.appendField(builder, "@source_path",
+          srcPath.getBytes(charset));
+    }
+
+    builder.startObject("@fields");
+    for (String key : headers.keySet()) {
+      byte[] val = headers.get(key).getBytes(charset);
+      ContentBuilderUtil.appendField(builder, key, val);
+    }
+    builder.endObject();
+  }
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
new file mode 100644
index 0000000..26bad20
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * A sink which reads events from a channel and writes them to ElasticSearch
+ * based on the work done by https://github.com/Aconex/elasticflume.git.</p>
+ *
+ * This sink supports batch reading of events from the channel and writing them
+ * to ElasticSearch.</p>
+ *
+ * Indexes will be rolled daily using the format 'indexname-YYYY-MM-dd' to allow
+ * easier management of the index</p>
+ *
+ * This sink must be configured with with mandatory parameters detailed in
+ * {@link ElasticSearchSinkConstants}</p>
+ * It is recommended as a secondary step the ElasticSearch indexes are optimized
+ * for the specified serializer. This is not handled by the sink but is
+ * typically done by deploying a config template alongside the ElasticSearch
+ * deploy</p>
+ * @see http
+ *      ://www.elasticsearch.org/guide/reference/api/admin-indices-templates.
+ *      html
+ */
+public class ElasticSearchSink extends AbstractSink implements Configurable {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(ElasticSearchSink.class);
+
+  static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd");
+
+  // Used for testing
+  private boolean isLocal = false;
+  private final CounterGroup counterGroup = new CounterGroup();
+
+  private int batchSize;
+  private long ttlMs = DEFAULT_TTL;
+  private String clusterName = DEFAULT_CLUSTER_NAME;
+  private String indexName = DEFAULT_INDEX_NAME;
+  private String indexType = DEFAULT_INDEX_TYPE;
+
+  private InetSocketTransportAddress[] serverAddresses;
+
+  private Node node;
+  private Client client;
+  private ElasticSearchEventSerializer serializer;
+  private SinkCounter sinkCounter;
+
+  /**
+   * Create an {@link ElasticSearchSink} configured using the supplied
+   * configuration
+   */
+  public ElasticSearchSink() {
+    this(false);
+  }
+
+  /**
+   * Create an {@link ElasticSearchSink}</p>
+   *
+   * @param isLocal
+   *          If <tt>true</tt> sink will be configured to only talk to an
+   *          ElasticSearch instance hosted in the same JVM, should always be
+   *          false is production
+   *
+   */
+  @VisibleForTesting
+  ElasticSearchSink(boolean isLocal) {
+    this.isLocal = isLocal;
+  }
+
+  @VisibleForTesting
+  InetSocketTransportAddress[] getServerAddresses() {
+    return serverAddresses;
+  }
+
+  @VisibleForTesting
+  String getClusterName() {
+    return clusterName;
+  }
+
+  @VisibleForTesting
+  String getIndexName() {
+    return indexName + "-" + df.format(new Date());
+  }
+
+  @VisibleForTesting
+  String getIndexType() {
+    return indexType;
+  }
+
+  @VisibleForTesting
+  long getTTLMs() {
+    return ttlMs;
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    logger.debug("processing...");
+    Status status = Status.READY;
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+    try {
+      txn.begin();
+      String indexName = getIndexName();
+      BulkRequestBuilder bulkRequest = client.prepareBulk();
+      for (int i = 0; i < batchSize; i++) {
+        Event event = channel.take();
+
+        if (event == null) {
+          break;
+        }
+
+        XContentBuilder builder = serializer.getContentBuilder(event);
+        IndexRequestBuilder request = client.prepareIndex(indexName, indexType)
+            .setSource(builder);
+
+        if (ttlMs > 0) {
+          request.setTTL(ttlMs);
+        }
+
+        bulkRequest.add(request);
+      }
+
+      int size = bulkRequest.numberOfActions();
+      if (size <= 0) {
+        sinkCounter.incrementBatchEmptyCount();
+        counterGroup.incrementAndGet("channel.underflow");
+        status = Status.BACKOFF;
+      } else {
+        if (size < batchSize) {
+          sinkCounter.incrementBatchUnderflowCount();
+          status = Status.BACKOFF;
+        } else {
+          sinkCounter.incrementBatchCompleteCount();
+        }
+
+        sinkCounter.addToEventDrainAttemptCount(size);
+
+        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+        if (bulkResponse.hasFailures()) {
+          throw new EventDeliveryException(bulkResponse.buildFailureMessage());
+        }
+      }
+      txn.commit();
+      sinkCounter.addToEventDrainSuccessCount(size);
+      counterGroup.incrementAndGet("transaction.success");
+    } catch (Throwable ex) {
+      try {
+        txn.rollback();
+        counterGroup.incrementAndGet("transaction.rollback");
+      } catch (Exception ex2) {
+        logger.error(
+            "Exception in rollback. Rollback might not have been successful.",
+            ex2);
+      }
+
+      if (ex instanceof Error || ex instanceof RuntimeException) {
+        logger.error("Failed to commit transaction. Transaction rolled back.",
+            ex);
+        Throwables.propagate(ex);
+      } else {
+        logger.error("Failed to commit transaction. Transaction rolled back.",
+            ex);
+        throw new EventDeliveryException(
+            "Failed to commit transaction. Transaction rolled back.", ex);
+      }
+    } finally {
+      txn.close();
+    }
+    return status;
+  }
+
+  @Override
+  public void configure(Context context) {
+    if (!isLocal) {
+      String[] hostNames = null;
+      if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
+        hostNames = context.getString(HOSTNAMES).split(",");
+      }
+      Preconditions.checkState(hostNames != null && hostNames.length > 0,
+          "Missing Param:" + HOSTNAMES);
+
+      serverAddresses = new InetSocketTransportAddress[hostNames.length];
+      for (int i = 0; i < hostNames.length; i++) {
+        String[] hostPort = hostNames[i].split(":");
+        String host = hostPort[0];
+        int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1])
+            : DEFAULT_PORT;
+        serverAddresses[i] = new InetSocketTransportAddress(host, port);
+      }
+
+      Preconditions.checkState(serverAddresses != null
+          && serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(INDEX_NAME))) {
+      this.indexName = context.getString(INDEX_NAME);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(INDEX_TYPE))) {
+      this.indexType = context.getString(INDEX_TYPE);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(CLUSTER_NAME))) {
+      this.clusterName = context.getString(CLUSTER_NAME);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(BATCH_SIZE))) {
+      this.batchSize = Integer.parseInt(context.getString(BATCH_SIZE));
+    }
+
+    if (StringUtils.isNotBlank(context.getString(TTL))) {
+      this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context
+          .getString(TTL)));
+      Preconditions.checkState(ttlMs > 0, TTL
+          + " must be greater than 0 or not set.");
+    }
+
+    String serializerClazz = "org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+    if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
+      serializerClazz = context.getString(SERIALIZER);
+    }
+
+    Context serializerContext = new Context();
+    serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX));
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends ElasticSearchEventSerializer> clazz = (Class<? extends ElasticSearchEventSerializer>) Class
+          .forName(serializerClazz);
+      serializer = clazz.newInstance();
+      serializer.configure(serializerContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate event serializer.", e);
+      Throwables.propagate(e);
+    }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+
+    Preconditions.checkState(StringUtils.isNotBlank(indexName),
+        "Missing Param:" + INDEX_NAME);
+    Preconditions.checkState(StringUtils.isNotBlank(indexType),
+        "Missing Param:" + INDEX_TYPE);
+    Preconditions.checkState(StringUtils.isNotBlank(clusterName),
+        "Missing Param:" + CLUSTER_NAME);
+    Preconditions.checkState(batchSize >= 1, BATCH_SIZE
+        + " must be greater than 0");
+  }
+
+  @Override
+  public void start() {
+    logger.info("ElasticSearch sink {} started");
+    sinkCounter.start();
+    try {
+      openConnection();
+    } catch (Exception ex) {
+      sinkCounter.incrementConnectionFailedCount();
+      closeConnection();
+    }
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    logger.info("ElasticSearch sink {} stopping");
+    closeConnection();
+
+    sinkCounter.stop();
+    super.stop();
+  }
+
+  private void openConnection() {
+    if (isLocal) {
+      logger.info("Using ElasticSearch AutoDiscovery mode");
+      openLocalDiscoveryClient();
+    } else {
+      logger.info("Using ElasticSearch hostnames: {} ",
+          Arrays.toString(serverAddresses));
+      openClient();
+    }
+    sinkCounter.incrementConnectionCreatedCount();
+  }
+
+  /*
+   * FOR TESTING ONLY...
+   *
+   * Opens a local discovery node for talking to an elasticsearch server running
+   * in the same JVM
+   */
+  private void openLocalDiscoveryClient() {
+    node = NodeBuilder.nodeBuilder().client(true).local(true).node();
+    client = node.client();
+  }
+
+  private void openClient() {
+    Settings settings = ImmutableSettings.settingsBuilder()
+        .put("cluster.name", clusterName).build();
+
+    TransportClient transport = new TransportClient(settings);
+    for (InetSocketTransportAddress host : serverAddresses) {
+      transport.addTransportAddress(host);
+    }
+    client = transport;
+  }
+
+  private void closeConnection() {
+    if (client != null) {
+      client.close();
+    }
+    client = null;
+
+    if (node != null) {
+      node.close();
+    }
+    node = null;
+
+    sinkCounter.incrementConnectionClosedCount();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
new file mode 100644
index 0000000..7f75e22
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+public class ElasticSearchSinkConstants {
+
+  /**
+   * Comma separated list of hostname:port, if the port is not present the
+   * default port '9300' will be used</p>
+   * Example:
+   * <pre>
+   *  127.0.0.1:92001,127.0.0.2:9300
+   * </pre>
+   */
+  public static final String HOSTNAMES = "hostNames";
+
+  /**
+   * The name to index the document to, defaults to 'flume'</p>
+   * The current date in the format 'yyyy-MM-dd' will be appended to this name,
+   * for example 'foo' will result in a daily index of 'foo-yyyy-MM-dd'
+   */
+  public static final String INDEX_NAME = "indexName";
+
+  /**
+   * The type to index the document to, defaults to 'log'
+   */
+  public static final String INDEX_TYPE = "indexType";
+
+  /**
+   * Name of the ElasticSearch cluster to connect to
+   */
+  public static final String CLUSTER_NAME = "clusterName";
+
+  /**
+   * Maximum number of events the sink should take from the channel per
+   * transaction, if available. Defaults to 100
+   */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * TTL in days, when set will cause the expired documents to be deleted
+   * automatically, if not set documents will never be automatically deleted
+   */
+  public static final String TTL = "ttl";
+
+  /**
+   * The fully qualified class name of the serializer the sink should use.
+   */
+  public static final String SERIALIZER = "serializer";
+
+  /**
+   * Configuration to pass to the serializer.
+   */
+  public static final String SERIALIZER_PREFIX = SERIALIZER + ".";
+
+  /**
+   * DEFAULTS USED BY THE SINK
+   */
+
+  public static final int DEFAULT_PORT = 9300;
+  public static final int DEFAULT_TTL = -1;
+  public static final String DEFAULT_INDEX_NAME = "flume";
+  public static final String DEFAULT_INDEX_TYPE = "log";
+  public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
new file mode 100644
index 0000000..6d472cf
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.gateway.Gateway;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+
+public abstract class AbstractElasticSearchSinkTest {
+
+  static final String DEFAULT_INDEX_NAME = "flume";
+  static final String DEFAULT_INDEX_TYPE = "log";
+  static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+
+  Node node;
+  Client client;
+  String timestampedIndexName;
+  Map<String, String> parameters;
+
+  void initDefaults() {
+    parameters = Maps.newHashMap();
+    parameters.put(INDEX_NAME, DEFAULT_INDEX_NAME);
+    parameters.put(INDEX_TYPE, DEFAULT_INDEX_TYPE);
+    parameters.put(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+    parameters.put(BATCH_SIZE, "1");
+    parameters.put(TTL, "5");
+
+    timestampedIndexName = DEFAULT_INDEX_NAME + "-"
+        + ElasticSearchSink.df.format(new Date());
+  }
+
+  void createNodes() throws Exception {
+    node = NodeBuilder.nodeBuilder().local(true).node();
+    client = node.client();
+
+    client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute()
+        .actionGet();
+  }
+
+  void shutdownNodes() throws Exception {
+    ((InternalNode) node).injector().getInstance(Gateway.class).reset();
+    client.close();
+    node.close();
+  }
+
+  Channel bindAndStartChannel(ElasticSearchSink fixture) {
+    // Configure the channel
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+
+    // Wire them together
+    fixture.setChannel(channel);
+    fixture.start();
+    return channel;
+  }
+
+  void assertMatchAllQuery(int expectedHits, Event... events) {
+    assertSearch(expectedHits, performSearch(QueryBuilders.matchAllQuery()),
+        events);
+  }
+
+  void assertBodyQuery(int expectedHits, Event... events) {
+    // Perform Multi Field Match
+    assertSearch(expectedHits,
+        performSearch(QueryBuilders.fieldQuery("@message", "event")));
+  }
+
+  SearchResponse performSearch(QueryBuilder query) {
+    return client.prepareSearch(timestampedIndexName)
+        .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
+  }
+
+  void assertSearch(int expectedHits, SearchResponse response, Event... events) {
+    SearchHits hitResponse = response.getHits();
+    assertEquals(expectedHits, hitResponse.getTotalHits());
+
+    SearchHit[] hits = hitResponse.getHits();
+    Arrays.sort(hits, new Comparator<SearchHit>() {
+      @Override
+      public int compare(SearchHit o1, SearchHit o2) {
+        return o1.getSourceAsString().compareTo(o2.getSourceAsString());
+      }
+    });
+
+    for (int i = 0; i < events.length; i++) {
+      Event event = events[i];
+      SearchHit hit = hits[i];
+      Map<String, Object> source = hit.getSource();
+      assertEquals(new String(event.getBody()), source.get("@message"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
new file mode 100644
index 0000000..3317734
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+public class ElasticSearchDynamicSerializerTest {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("headerNameThree", "headerValueThree");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("body", new String(message.getBytes(), charset));
+    for (String headerName : headers.keySet()) {
+      expected.field(headerName, new String(headers.get(headerName).getBytes(),
+          charset));
+    }
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
new file mode 100644
index 0000000..a974e8b
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+public class ElasticSearchLogStashEventSerializerTest {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+  }
+
+  @Test
+  public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "{flume: somethingnotvalid}";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
new file mode 100644
index 0000000..bb2f9f4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchSinkTest extends AbstractElasticSearchSinkTest {
+
+  private ElasticSearchSink fixture;
+
+  @Before
+  public void init() throws Exception {
+    initDefaults();
+    createNodes();
+    fixture = new ElasticSearchSink(true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    shutdownNodes();
+  }
+
+  @Test
+  public void shouldIndexOneEvent() throws Exception {
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody("event #1 or 1".getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(1, event);
+    assertBodyQuery(1, event);
+  }
+
+  @Test
+  public void shouldIndexFiveEvents() throws Exception {
+    // Make it so we only need to call process once
+    parameters.put(BATCH_SIZE, "5");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
+    parameters.put(BATCH_SIZE, "2");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    int count = 0;
+    Status status = Status.READY;
+    while (status != Status.BACKOFF) {
+      count++;
+      status = fixture.process();
+    }
+    fixture.stop();
+
+    assertEquals(3, count);
+
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldParseConfiguration() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+    parameters.put(TTL, "10");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT) };
+
+    assertEquals("testing-cluster-name", fixture.getClusterName());
+    assertEquals(
+        "testing-index-name-" + ElasticSearchSink.df.format(new Date()),
+        fixture.getIndexName());
+    assertEquals("testing-index-type", fixture.getIndexType());
+    assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseConfigurationUsingDefaults() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.remove(INDEX_NAME);
+    parameters.remove(INDEX_TYPE);
+    parameters.remove(CLUSTER_NAME);
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT) };
+
+    assertEquals(
+        DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()),
+        fixture.getIndexName());
+    assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
+    assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostUsingDefaultPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = {
+        new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
+        new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
+        new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostAndPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    InetSocketTransportAddress[] expected = {
+        new InetSocketTransportAddress("10.5.5.27", 9300),
+        new InetSocketTransportAddress("10.5.5.28", 9301),
+        new InetSocketTransportAddress("10.5.5.29", 9302) };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 53a630a..7170348 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -44,5 +44,6 @@ limitations under the License.
     <module>flume-hdfs-sink</module>
     <module>flume-irc-sink</module>
     <module>flume-ng-hbase-sink</module>
+    <module>flume-ng-elasticsearch-sink</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/2aabe6e3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fa0a9dc..64684d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -802,6 +802,21 @@ limitations under the License.
         <version>${hadoop.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <version>0.19.10</version>
+        <optional>true</optional>
+      </dependency>
+
+      <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <version>0.19.10</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+      </dependency>
+
       <!-- internal module dependencies -->
 
       <dependency>
@@ -858,6 +873,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sinks</groupId>
+        <artifactId>flume-ng-elasticsearch-sink</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-sources</groupId>
         <artifactId>flume-scribe-source</artifactId>
         <version>1.4.0-SNAPSHOT</version>


Mime
View raw message