streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [3/4] incubator-streams git commit: added tests and fixes to make tests work
Date Fri, 20 Mar 2015 00:54:16 GMT
added tests and fixes to make tests work


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d953487
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d953487
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d953487

Branch: refs/heads/master
Commit: 0d953487f8ecb93706759863f4fe65d3f3306289
Parents: 8d2b3e2
Author: sblackmon <sblackmon@apache.org>
Authored: Wed Mar 11 11:57:55 2015 -0700
Committer: sblackmon <sblackmon@apache.org>
Committed: Wed Mar 11 11:57:55 2015 -0700

----------------------------------------------------------------------
 streams-contrib/streams-persist-hdfs/pom.xml    |  25 ++++
 .../streams/hdfs/WebHdfsPersistReader.java      |   7 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |  33 ++---
 .../streams/hdfs/WebHdfsPersistWriter.java      |   6 +-
 .../streams/hdfs/test/TestHdfsPersist.java      | 119 +++++++++++++++++++
 5 files changed, 172 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 9bf86ed..0946bfb 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -55,6 +55,12 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hdfs.version}</version>
@@ -130,6 +136,25 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 02345a2..08f78cf 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -172,7 +172,12 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
 
     @Override
     public StreamsResultSet readAll() {
-        startStream();
+        WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this);
+        Thread readerThread = new Thread(readerTask);
+        readerThread.start();
+        try {
+            readerThread.join();
+        } catch (InterruptedException e) {}
         return new StreamsResultSet(persistQueue);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 3f9b906..f3877e3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -106,38 +106,39 @@ public class WebHdfsPersistReaderTask implements Runnable {
 
     private StreamsDatum processLine(String line) {
 
-        StreamsDatum datum;
-
         String[] fields = line.split(reader.hdfsConfiguration.getFieldDelimiter());
 
         if( fields.length == 0)
             return null;
 
-        String id;
-        DateTime ts;
-        Map<String, Object> metadata;
-        String json;
+        String id = null;
+        DateTime ts = null;
+        Map<String, Object> metadata = null;
+        String json = null;
 
-        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC )) {
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC )
+            && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC))
{
             json = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC)];
-            datum = new StreamsDatum(json);
-        } else {
-            return null;
         }
 
-        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID ) ) {
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID )
+            && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID))
{
             id = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID)];
-            datum.setId(id);
         }
-        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )) {
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )
+            && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS))
{
             ts = parseTs(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS)]);
-            datum.setTimestamp(ts);
         }
-        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META )) {
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META )
+            && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META))
{
             metadata = parseMap(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META)]);
-            datum.setMetadata(metadata);
         }
 
+        StreamsDatum datum = new StreamsDatum(json);
+        datum.setId(id);
+        datum.setTimestamp(ts);
+        datum.setMetadata(metadata);
+
         return datum;
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 38ded28..76d57a6 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.core.*;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -274,7 +275,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
                 else if( field.equals(HdfsConstants.ID) )
                     fielddata.add(entry.getId());
                 else if( field.equals(HdfsConstants.TS) )
-                    fielddata.add(entry.getTimestamp().toString());
+                    if( entry.getTimestamp() != null )
+                        fielddata.add(entry.getTimestamp().toString());
+                    else
+                        fielddata.add(DateTime.now().toString());
                 else if( field.equals(HdfsConstants.META) )
                     fielddata.add(metadataJson);
                 else if( entry.getMetadata().containsKey(field)) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
new file mode 100644
index 0000000..fe5a767
--- /dev/null
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.hdfs.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.hdfs.HdfsConfiguration;
+import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.hdfs.HdfsWriterConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test reading and writing documents
+ */
+public class TestHdfsPersist {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    @Before
+    public void setup() {
+        File file = new File("/target/TestHdfsPersist/");
+        if( file.exists())
+            file.delete();
+    }
+
+    @Test
+    public void TestHdfsPersist() throws Exception {
+
+        List<List<String>> fieldArrays = Lists.newArrayList();
+        fieldArrays.add(new ArrayList<String>());
+        fieldArrays.add(Lists.newArrayList("ID"));
+        fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+
+        for( List<String> fields : fieldArrays )
+            TestHdfsPersistCase(fields);
+
+    }
+
+    public void TestHdfsPersistCase(List<String> fields) throws Exception {
+
+        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration().withScheme(HdfsConfiguration.Scheme.FILE).withHost("localhost").withUser("cloudera").withPath("target/TestHdfsPersist");
+        if( fields.size() > 0 )
+            hdfsConfiguration.setFields(fields);
+        HdfsWriterConfiguration hdfsWriterConfiguration = MAPPER.convertValue(hdfsConfiguration,
HdfsWriterConfiguration.class);
+        hdfsWriterConfiguration.setWriterPath(new Integer(fields.size()).toString());
+        hdfsWriterConfiguration.setWriterFilePrefix("activities");
+        WebHdfsPersistWriter writer = new WebHdfsPersistWriter(hdfsWriterConfiguration);
+
+        writer.prepare(null);
+
+        InputStream testActivityFolderStream = TestHdfsPersist.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        int count = 0;
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = TestHdfsPersist.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            activity.getAdditionalProperties().remove("$license");
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            writer.write( datum );
+            LOGGER.info("Wrote: " + activity.getVerb() );
+            count++;
+        }
+
+        writer.cleanUp();
+
+        HdfsReaderConfiguration hdfsReaderConfiguration = MAPPER.convertValue(hdfsConfiguration,
HdfsReaderConfiguration.class);
+
+        WebHdfsPersistReader reader = new WebHdfsPersistReader(hdfsReaderConfiguration);
+        hdfsReaderConfiguration.setReaderPath(new Integer(fields.size()).toString());
+
+        reader.prepare(null);
+
+        StreamsResultSet resultSet = reader.readAll();
+
+        assert( resultSet.size() == count);
+
+    }
+}


Mime
View raw message