added tests and fixes to make tests work
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d953487
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d953487
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d953487
Branch: refs/heads/master
Commit: 0d953487f8ecb93706759863f4fe65d3f3306289
Parents: 8d2b3e2
Author: sblackmon <sblackmon@apache.org>
Authored: Wed Mar 11 11:57:55 2015 -0700
Committer: sblackmon <sblackmon@apache.org>
Committed: Wed Mar 11 11:57:55 2015 -0700
----------------------------------------------------------------------
streams-contrib/streams-persist-hdfs/pom.xml | 25 ++++
.../streams/hdfs/WebHdfsPersistReader.java | 7 +-
.../streams/hdfs/WebHdfsPersistReaderTask.java | 33 ++---
.../streams/hdfs/WebHdfsPersistWriter.java | 6 +-
.../streams/hdfs/test/TestHdfsPersist.java | 119 +++++++++++++++++++
5 files changed, 172 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 9bf86ed..0946bfb 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -55,6 +55,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hdfs.version}</version>
@@ -130,6 +136,25 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 02345a2..08f78cf 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -172,7 +172,12 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public StreamsResultSet readAll() {
- startStream();
+ WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this);
+ Thread readerThread = new Thread(readerTask);
+ readerThread.start();
+ try {
+ readerThread.join();
+ } catch (InterruptedException e) {}
return new StreamsResultSet(persistQueue);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 3f9b906..f3877e3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -106,38 +106,39 @@ public class WebHdfsPersistReaderTask implements Runnable {
private StreamsDatum processLine(String line) {
- StreamsDatum datum;
-
String[] fields = line.split(reader.hdfsConfiguration.getFieldDelimiter());
if( fields.length == 0)
return null;
- String id;
- DateTime ts;
- Map<String, Object> metadata;
- String json;
+ String id = null;
+ DateTime ts = null;
+ Map<String, Object> metadata = null;
+ String json = null;
- if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC )) {
+ if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC )
+ && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC))
{
json = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC)];
- datum = new StreamsDatum(json);
- } else {
- return null;
}
- if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID ) ) {
+ if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID )
+ && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID))
{
id = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID)];
- datum.setId(id);
}
- if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )) {
+ if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )
+ && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS))
{
ts = parseTs(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS)]);
- datum.setTimestamp(ts);
}
- if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META )) {
+ if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META )
+ && fields.length > reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META))
{
metadata = parseMap(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META)]);
- datum.setMetadata(metadata);
}
+ StreamsDatum datum = new StreamsDatum(json);
+ datum.setId(id);
+ datum.setTimestamp(ts);
+ datum.setMetadata(metadata);
+
return datum;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 38ded28..76d57a6 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.core.*;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -274,7 +275,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable,
Cl
else if( field.equals(HdfsConstants.ID) )
fielddata.add(entry.getId());
else if( field.equals(HdfsConstants.TS) )
- fielddata.add(entry.getTimestamp().toString());
+ if( entry.getTimestamp() != null )
+ fielddata.add(entry.getTimestamp().toString());
+ else
+ fielddata.add(DateTime.now().toString());
else if( field.equals(HdfsConstants.META) )
fielddata.add(metadataJson);
else if( entry.getMetadata().containsKey(field)) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d953487/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
new file mode 100644
index 0000000..fe5a767
--- /dev/null
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.hdfs.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.hdfs.HdfsConfiguration;
+import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.hdfs.HdfsWriterConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test reading and writing documents
+ */
+public class TestHdfsPersist {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ @Before
+ public void setup() {
+ File file = new File("/target/TestHdfsPersist/");
+ if( file.exists())
+ file.delete();
+ }
+
+ @Test
+ public void TestHdfsPersist() throws Exception {
+
+ List<List<String>> fieldArrays = Lists.newArrayList();
+ fieldArrays.add(new ArrayList<String>());
+ fieldArrays.add(Lists.newArrayList("ID"));
+ fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+
+ for( List<String> fields : fieldArrays )
+ TestHdfsPersistCase(fields);
+
+ }
+
+ public void TestHdfsPersistCase(List<String> fields) throws Exception {
+
+ HdfsConfiguration hdfsConfiguration = new HdfsConfiguration().withScheme(HdfsConfiguration.Scheme.FILE).withHost("localhost").withUser("cloudera").withPath("target/TestHdfsPersist");
+ if( fields.size() > 0 )
+ hdfsConfiguration.setFields(fields);
+ HdfsWriterConfiguration hdfsWriterConfiguration = MAPPER.convertValue(hdfsConfiguration,
HdfsWriterConfiguration.class);
+ hdfsWriterConfiguration.setWriterPath(new Integer(fields.size()).toString());
+ hdfsWriterConfiguration.setWriterFilePrefix("activities");
+ WebHdfsPersistWriter writer = new WebHdfsPersistWriter(hdfsWriterConfiguration);
+
+ writer.prepare(null);
+
+ InputStream testActivityFolderStream = TestHdfsPersist.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ int count = 0;
+
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = TestHdfsPersist.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ activity.getAdditionalProperties().remove("$license");
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ writer.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ count++;
+ }
+
+ writer.cleanUp();
+
+ HdfsReaderConfiguration hdfsReaderConfiguration = MAPPER.convertValue(hdfsConfiguration,
HdfsReaderConfiguration.class);
+
+ WebHdfsPersistReader reader = new WebHdfsPersistReader(hdfsReaderConfiguration);
+ hdfsReaderConfiguration.setReaderPath(new Integer(fields.size()).toString());
+
+ reader.prepare(null);
+
+ StreamsResultSet resultSet = reader.readAll();
+
+ assert( resultSet.size() == count);
+
+ }
+}
|