streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/2] incubator-streams git commit: resolves STREAMS-322
Date Fri, 19 Jun 2015 22:17:31 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master 7206560e5 -> 1dc8ce196


resolves STREAMS-322


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78f0b4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78f0b4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78f0b4c1

Branch: refs/heads/master
Commit: 78f0b4c1c7fe388e2e1169a32ff35bde7ab039df
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Fri Jun 12 16:04:02 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Fri Jun 12 16:04:02 2015 -0500

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistReaderTask.java    | 18 ++++++++++++++----
 .../apache/streams/hdfs/WebHdfsPersistWriter.java | 13 +++++++++++--
 .../apache/streams/hdfs/HdfsConfiguration.json    |  4 ++++
 .../streams/hdfs/HdfsWriterConfiguration.json     |  6 ++++++
 .../apache/streams/hdfs/test/TestHdfsPersist.java |  7 ++++---
 5 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78f0b4c1/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 4d1c43d..e91d1e4 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -32,9 +32,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
 
 public class WebHdfsPersistReaderTask implements Runnable {
 
@@ -50,15 +52,23 @@ public class WebHdfsPersistReaderTask implements Runnable {
     public void run() {
 
         for( FileStatus fileStatus : reader.status ) {
+            InputStream inputStream;
+            InputStreamReader inputStreamReader;
             BufferedReader bufferedReader;
             LOGGER.info("Found " + fileStatus.getPath().getName());
             if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_"))
{
-                LOGGER.info("Started Processing " + fileStatus.getPath().getName());
+                HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE;
+                if( fileStatus.getPath().getName().endsWith(".gz"))
+                    compression = HdfsWriterConfiguration.Compression.GZIP;
+                LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", fileStatus.getPath().getName(),
reader.hdfsConfiguration.getEncoding(), compression.toString());
                 try {
-                    bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
+                    inputStream = reader.client.open(fileStatus.getPath());
+                    if( compression.equals(HdfsWriterConfiguration.Compression.GZIP))
+                        inputStream = new GZIPInputStream(inputStream);
+                    inputStreamReader = new InputStreamReader(inputStream, reader.hdfsConfiguration.getEncoding());
+                    bufferedReader = new BufferedReader(inputStreamReader);
                 } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.error(e.getMessage());
+                    LOGGER.error("Exception Opening " + fileStatus.getPath(), e.getMessage());
                     return;
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78f0b4c1/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 76d57a6..62f8d9e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -46,6 +46,7 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
+import java.util.zip.GZIPOutputStream;
 
 public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable,
DatumStatusCountable {
     public final static String STREAMS_ID = "WebHdfsPersistWriter";
@@ -208,7 +209,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
             return;
 
         // Create the path for where the file is going to live.
-        Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() +
"-" + new Date().getTime() + ".tsv");
+        Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() +
"-" + new Date().getTime());
+
+        if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
+            filePath = filePath.suffix(".gz");
+        else
+            filePath = filePath.suffix(".tsv");
 
         try {
 
@@ -224,7 +230,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
             if (client.exists(filePath))
                 throw new RuntimeException("Unable to create file: " + filePath);
 
-            this.currentWriter = new OutputStreamWriter(client.create(filePath));
+            if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
+                this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
+            else
+                this.currentWriter = new OutputStreamWriter(client.create(filePath));
 
             // Add another file to the list of written files.
             writtenFiles.add(filePath);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78f0b4c1/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index e6e1e4c..61245c4 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -53,6 +53,10 @@
         "line_delimiter": {
           "type": "string",
           "default": "\n"
+        },
+        "encoding": {
+          "type": "string",
+          "default": "UTF-8"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78f0b4c1/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
index 29f2d3d..76d2fca 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
@@ -21,6 +21,12 @@
             "type": "integer",
             "description": "Lines Per File",
             "default": 1000
+        },
+        "compression": {
+            "type": "string",
+            "description": "compression",
+            "enum" : ["none", "gzip"],
+            "default": "none"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78f0b4c1/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
index fe5a767..ff33ec3 100644
--- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -75,11 +75,12 @@ public class TestHdfsPersist {
     public void TestHdfsPersistCase(List<String> fields) throws Exception {
 
         HdfsConfiguration hdfsConfiguration = new HdfsConfiguration().withScheme(HdfsConfiguration.Scheme.FILE).withHost("localhost").withUser("cloudera").withPath("target/TestHdfsPersist");
-        if( fields.size() > 0 )
-            hdfsConfiguration.setFields(fields);
+        hdfsConfiguration.setFields(fields);
         HdfsWriterConfiguration hdfsWriterConfiguration = MAPPER.convertValue(hdfsConfiguration,
HdfsWriterConfiguration.class);
-        hdfsWriterConfiguration.setWriterPath(new Integer(fields.size()).toString());
+        if( fields.size() % 2 == 1 )
+            hdfsWriterConfiguration.setCompression(HdfsWriterConfiguration.Compression.GZIP);
         hdfsWriterConfiguration.setWriterFilePrefix("activities");
+        hdfsWriterConfiguration.setWriterPath(Integer.toString(fields.size()));
         WebHdfsPersistWriter writer = new WebHdfsPersistWriter(hdfsWriterConfiguration);
 
         writer.prepare(null);


Mime
View raw message