flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1509. HDFS sink should allow for the use of different timezones when resolving sink paths.
Date Tue, 04 Sep 2012 22:45:42 GMT
Updated Branches:
  refs/heads/trunk 873b032fa -> 80699c462


FLUME-1509. HDFS sink should allow for the use of different timezones when resolving sink
paths.

(Jonathan Natkins via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/80699c46
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/80699c46
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/80699c46

Branch: refs/heads/trunk
Commit: 80699c46275fd8ad4be30de3cd1e3cd746b3ac26
Parents: 873b032
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Tue Sep 4 15:34:05 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Tue Sep 4 15:34:05 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/formatter/output/BucketPath.java  |   32 +++++++++++++-
 .../flume/formatter/output/TestBucketPath.java     |   15 +++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    1 +
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |    6 ++-
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |    1 +
 5 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/80699c46/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
index cf105c7..fcc26f2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
@@ -23,6 +23,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -116,6 +117,16 @@ public class BucketPath {
   }
 
   /**
+   * A wrapper around
+   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)}
+   * with the timezone set to the default.
+   */
+  public static String replaceShorthand(char c, Map<String, String> headers,
+      boolean needRounding, int unit, int roundDown) {
+    return replaceShorthand(c, headers, null, needRounding, unit, roundDown);
+  }
+
+  /**
    * Hardcoded lookups for %x style escape replacement. Add your own!
    *
    * All shorthands are Date format strings, currently.
@@ -125,6 +136,7 @@ public class BucketPath {
    * Dates follow the same format as unix date, with a few exceptions.
    * @param c - The character to replace.
    * @param headers - Event headers
+   * @param timeZone - The timezone to use for formatting the timestamp
    * @param needRounding - Should the timestamp be rounded down?
    * @param unit - if needRounding is true, what unit to round down to. This
    * must be one of the units specified by {@link java.util.Calendar} -
@@ -138,7 +150,7 @@ public class BucketPath {
    * @return
    */
   public static String replaceShorthand(char c, Map<String, String> headers,
-      boolean needRounding, int unit, int roundDown) {
+      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
 
     String timestampHeader = headers.get("timestamp");
     long ts;
@@ -229,6 +241,10 @@ public class BucketPath {
     }
 
     SimpleDateFormat format = new SimpleDateFormat(formatString);
+    if (timeZone != null) {
+      format.setTimeZone(timeZone);
+    }
+
     Date date = new Date(ts);
     return format.format(date);
   }
@@ -272,6 +288,16 @@ public class BucketPath {
   }
 
   /**
+   * A wrapper around
+   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)}
+   * with the timezone set to the default.
+   */
+  public static String escapeString(String in, Map<String, String> headers,
+      boolean needRounding, int unit, int roundDown) {
+    return escapeString(in, headers, null, needRounding, unit, roundDown);
+  }
+
+  /**
    * Replace all substrings of form %{tagname} with get(tagname).toString() and
    * all shorthand substrings of form %x with a special value.
    *
@@ -293,7 +319,7 @@ public class BucketPath {
    * @return Escaped string.
    */
   public static String escapeString(String in, Map<String, String> headers,
-      boolean needRounding, int unit, int roundDown) {
+      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
     Matcher matcher = tagPattern.matcher(in);
     StringBuffer sb = new StringBuffer();
     while (matcher.find()) {
@@ -314,7 +340,7 @@ public class BucketPath {
             && matcher.group(1).length() == 1,
             "Expected to match single character tag in string " + in);
         char c = matcher.group(1).charAt(0);
-        replacement = replaceShorthand(c, headers,
+        replacement = replaceShorthand(c, headers, timeZone,
             needRounding, unit, roundDown);
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/80699c46/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 86f3293..090b3a8 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -97,4 +98,18 @@ public class TestBucketPath {
     Assert.assertEquals(expectedString, escapedString);
   }
 
+  @Test
+  public void testDateFormatTimeZone(){
+    TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
+    String test = "%c";
+    String escapedString = BucketPath.escapeString(
+        test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12);
+    System.out.println("Escaped String: " + escapedString);
+    SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+    format.setTimeZone(utcTimeZone);
+    Date d = new Date(cal.getTimeInMillis());
+    String expectedString = format.format(d);
+    System.out.println("Expected String: "+ expectedString);
+    Assert.assertEquals(expectedString, escapedString);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/80699c46/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 50bdc6c..ffed72b 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -984,6 +984,7 @@ Name                    Default       Description
 **channel**             --
 **type**                --            The component type name, needs to be ``hdfs``
 **hdfs.path**           --            HDFS directory path (eg hdfs://namenode/flume/webdata/)
+hdfs.timeZone           Local Time    Name of the timezone that should be used for resolving
the directory path, e.g. America/Los_Angeles.
 hdfs.filePrefix         FlumeData     Name prefixed to files created by Flume in hdfs directory
 hdfs.rollInterval       30            Number of seconds to wait before rolling current file
                                       (0 = never roll based on time interval)

http://git-wip-us.apache.org/repos/asf/flume/blob/80699c46/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index fcb9642..9a76ecb 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -108,6 +109,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private CompressionType compType;
   private String fileType;
   private String path;
+  private TimeZone timeZone;
   private int maxOpenFiles;
   private String writeFormat;
   private ExecutorService callTimeoutPool;
@@ -177,6 +179,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
         context.getString("hdfs.path"), "hdfs.path is required");
     String fileName = context.getString("hdfs.filePrefix", defaultFileName);
     this.path = dirpath + System.getProperty("file.separator") + fileName;
+    String tzName = context.getString("hdfs.timeZone");
+    timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
@@ -387,7 +391,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
         // reconstruct the path name by substituting place holders
         String realPath = BucketPath.escapeString(path, event.getHeaders(),
-            needRounding, roundUnit, roundValue);
+            timeZone, needRounding, roundUnit, roundValue);
         BucketWriter bucketWriter = sfWriters.get(realPath);
 
         // we haven't seen this file yet, so open it and cache the handle

http://git-wip-us.apache.org/repos/asf/flume/blob/80699c46/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index b5f8c88..ba30d01 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -509,6 +509,7 @@ public class TestHDFSEventSink {
     Context context = new Context();
 
     context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    context.put("hdfs.timeZone", "UTC");
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));


Mime
View raw message