streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject git commit: Made linesPerFile configurable Made configurator more sensible
Date Thu, 11 Sep 2014 16:28:06 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-163 [created] 669194f5c


Made linesPerFile configurable
Made configurator more sensible


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/669194f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/669194f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/669194f5

Branch: refs/heads/STREAMS-163
Commit: 669194f5cd8d59f518d7cdbcab5b66cee9fe56b0
Parents: 3ee5175
Author: Steve Blackmon <sblackmon@w2odigital.com>
Authored: Sun Sep 7 14:13:07 2014 -0500
Committer: Steve Blackmon <sblackmon@w2odigital.com>
Committed: Sun Sep 7 14:13:07 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/hdfs/HdfsConfigurator.java   | 38 ++++++--------------
 .../streams/hdfs/WebHdfsPersistWriter.java      |  5 +--
 .../streams/hdfs/HdfsWriterConfiguration.json   |  5 +++
 3 files changed, 18 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index b000b85..c4823c3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -20,6 +20,7 @@ package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,45 +34,26 @@ public class HdfsConfigurator {
     private final static ObjectMapper mapper = new ObjectMapper();
 
     public static HdfsConfiguration detectConfiguration(Config hdfs) {
-        String host = hdfs.getString("host");
-        Long port = hdfs.getLong("port");
-        String path = hdfs.getString("path");
-        String user = hdfs.getString("user");
-        String password = hdfs.getString("password");
 
-        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
-
-        hdfsConfiguration.setHost(host);
-        hdfsConfiguration.setPort(port);
-        hdfsConfiguration.setPath(path);
-        hdfsConfiguration.setUser(user);
-        hdfsConfiguration.setPassword(password);
+        HdfsConfiguration hdfsConfiguration = null;
 
+        try {
+            hdfsConfiguration = mapper.readValue(hdfs.root().render(ConfigRenderOptions.concise()),
HdfsConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse HdfsConfiguration");
+        }
         return hdfsConfiguration;
     }
 
     public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) {
 
-        HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
-        HdfsReaderConfiguration hdfsReaderConfiguration  = mapper.convertValue(hdfsConfiguration,
HdfsReaderConfiguration.class);
-
-        String readerPath = hdfs.getString("readerPath");
-
-        hdfsReaderConfiguration.setReaderPath(readerPath);
-
-        return hdfsReaderConfiguration;
+        return mapper.convertValue(detectConfiguration(hdfs), HdfsReaderConfiguration.class);
     }
 
     public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) {
 
-        HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
-        HdfsWriterConfiguration hdfsWriterConfiguration  = mapper.convertValue(hdfsConfiguration,
HdfsWriterConfiguration.class);
-
-        String writerPath = hdfs.getString("writerPath");
-
-        hdfsWriterConfiguration.setWriterPath(writerPath);
-
-        return hdfsWriterConfiguration;
+        return mapper.convertValue(detectConfiguration(hdfs), HdfsWriterConfiguration.class);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 3ab3f29..4b480ad 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -54,7 +54,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
     private FileSystem client;
     private Path path;
     private String filePart = "default";
-    private int linesPerFile = 1000;
+    private int linesPerFile;
     private int totalRecordsWritten = 0;
     private final List<Path> writtenFiles = new ArrayList<Path>();
     private int fileLineCounter = 0;
@@ -75,6 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
 
     public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
+        this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
     }
 
     public URI getURI() throws URISyntaxException {
@@ -253,7 +254,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
             return null;
         else
             return new StringBuilder()
-                    .append(entry.getSequenceid())
+                    .append(entry.getId())
                     .append(DELIMITER)
                     .append(entry.getTimestamp())
                     .append(DELIMITER)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
index 5cadd7d..fedda38 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
@@ -13,6 +13,11 @@
         "writerFilePrefix": {
             "type": "string",
             "description": "File Prefix"
+        },
+        "linesPerFile": {
+            "type": "integer",
+            "description": "Lines Per File",
+            "default": 1000
         }
     }
 }
\ No newline at end of file


Mime
View raw message