flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying TTL
Date Thu, 31 Oct 2013 06:14:08 GMT
Updated Branches:
  refs/heads/flume-1.5 93de6b837 -> e1dbe0eb1


FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying
TTL

(Dib Ghosh via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e1dbe0eb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e1dbe0eb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e1dbe0eb

Branch: refs/heads/flume-1.5
Commit: e1dbe0eb1179af81c46a7774a3bfc0efd4705ce2
Parents: 93de6b8
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Oct 30 23:13:09 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Oct 30 23:13:53 2013 -0700

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  7 ++-
 .../sink/elasticsearch/ElasticSearchSink.java   | 50 +++++++++++++++++++-
 .../ElasticSearchSinkConstants.java             |  1 +
 .../elasticsearch/TestElasticSearchSink.java    | 36 ++++++++++++++
 5 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ef0a495..b387391 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ derby.log
 .idea
 *.iml
 nb-configuration.xml
+.DS_Store

http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a768383..3a3038c 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1985,7 +1985,10 @@ indexType         logs
 clusterName       elasticsearch                                                         
  Name of the ElasticSearch cluster to connect to
 batchSize         100                                                                   
  Number of events to be written per txn.
 ttl               --                                                                    
  TTL in days, when set will cause the expired documents to be deleted automatically,
-                                                                                        
  if not set documents will never be automatically deleted
+                                                                                        
  if not set documents will never be automatically deleted. TTL is accepted both in the earlier
form of
+                                                                                        
  integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second),
m (minute),
+                                                                                        
  h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow
+                                                                                        
  http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
 serializer        org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations
of
                                                                                         
  either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.
 serializer.*      --                                                                    
  Properties to be passed to the serializer.
@@ -2003,7 +2006,7 @@ Example for agent named a1:
   a1.sinks.k1.indexType = bar_type
   a1.sinks.k1.clusterName = foobar_cluster
   a1.sinks.k1.batchSize = 500
-  a1.sinks.k1.ttl = 5
+  a1.sinks.k1.ttl = 5d
   a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
   a1.sinks.k1.channel = c1
 

http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 3d01173..e38ab19 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -31,9 +31,12 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
@@ -98,6 +101,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable
{
   private String clusterName = DEFAULT_CLUSTER_NAME;
   private String indexName = DEFAULT_INDEX_NAME;
   private String indexType = DEFAULT_INDEX_TYPE;
+  private final Pattern pattern
+    = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
+  private Matcher matcher = pattern.matcher("");
 
   private InetSocketTransportAddress[] serverAddresses;
 
@@ -269,8 +275,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable
{
     }
 
     if (StringUtils.isNotBlank(context.getString(TTL))) {
-      this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context
-          .getString(TTL)));
+      this.ttlMs = parseTTL(context.getString(TTL));
       Preconditions.checkState(ttlMs > 0, TTL
           + " must be greater than 0 or not set.");
     }
@@ -354,6 +359,47 @@ public class ElasticSearchSink extends AbstractSink implements Configurable
{
   }
 
   /*
+   * Returns TTL value of ElasticSearch index in milliseconds
+   * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
+   * In case of unknown specifier TTL is not set. When specifier
+   * is not provided it defaults to days in milliseconds where the number
+   * of days is parsed integer from TTL string provided by user.
+   * <p>
+   *     Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms /
1s / 1h / 1m
+   *     specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or
w (weeks),
+   *     milliseconds is used as default unit.
+   *     http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+   * @param   ttl TTL value provided by user in flume configuration file for the sink
+   * @return  the ttl value in milliseconds
+  */
+  private long parseTTL(String ttl){
+    matcher = matcher.reset(ttl);
+    while (matcher.find()) {
+      if (matcher.group(2).equals("ms")) {
+        return Long.parseLong(matcher.group(1));
+      } else if (matcher.group(2).equals("s")) {
+        return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("m")) {
+        return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("h")) {
+        return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("d")) {
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("w")) {
+        return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("")) {
+        logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else {
+        logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
+        return 0;
+      }
+    }
+    logger.info("TTL not provided. Skipping the TTL config by returning 0.");
+    return 0;
+  }
+
+  /*
    * FOR TESTING ONLY...
    *
    * Opens a local discovery node for talking to an elasticsearch server running

http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
index 7f75e22..dd0c59d 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -78,4 +78,5 @@ public class ElasticSearchSinkConstants {
   public static final String DEFAULT_INDEX_NAME = "flume";
   public static final String DEFAULT_INDEX_TYPE = "log";
   public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  public static final String TTL_REGEX = "^(\\d+)(\\D*)";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 3f2ec6e..71789e8 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -31,6 +31,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
@@ -286,6 +288,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest
{
     assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
   }
 
+  @Test
+  public void shouldParseFullyQualifiedTTLs(){
+    Map<String, Long> testTTLMap = new HashMap<String, Long>();
+    testTTLMap.put("1ms", Long.valueOf(1));
+    testTTLMap.put("1s", Long.valueOf(1000));
+    testTTLMap.put("1m", Long.valueOf(60000));
+    testTTLMap.put("1h", Long.valueOf(3600000));
+    testTTLMap.put("1d", Long.valueOf(86400000));
+    testTTLMap.put("1w", Long.valueOf(604800000));
+    testTTLMap.put("1",  Long.valueOf(86400000));
+
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+
+    for (String ttl : testTTLMap.keySet()) {
+      parameters.put(TTL, ttl);
+      fixture = new ElasticSearchSink();
+      fixture.configure(new Context(parameters));
+
+      InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT)};
+
+      assertEquals("testing-cluster-name", fixture.getClusterName());
+      assertEquals("testing-index-name", fixture.getIndexName());
+      assertEquals("testing-index-type", fixture.getIndexType());
+      System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
+      assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
+      assertArrayEquals(expected, fixture.getServerAddresses());
+
+    }
+  }
+
   public static final class CustomElasticSearchIndexRequestBuilderFactory
       extends AbstractElasticSearchIndexRequestBuilderFactory {
 


Mime
View raw message