Repository: incubator-streams
Updated Branches:
refs/heads/master bea7a504c -> 19b10b13f
resolves STREAMS-329 #329
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3f6cb94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3f6cb94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3f6cb94
Branch: refs/heads/master
Commit: b3f6cb94d8a1f79300e90f7944a291ba9d351a5b
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Sun May 24 15:54:35 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Sun May 24 15:54:35 2015 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/pom.xml | 27 ++++
.../MetadataFromDocumentProcessor.java | 112 ++++++++++++++++
.../test/TestMetadataFromDocumentProcessor.java | 133 +++++++++++++++++++
3 files changed, 272 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 8bc3f42..e830071 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -86,6 +86,14 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<scope>compile</scope>
@@ -132,6 +140,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
new file mode 100644
index 0000000..7a9135d
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Examines document to derive metadata fields.
+ *
+ * This is useful if you have a document with a populated 'id', and 'verb' or 'objectType'
fields you want
+ * to use as _id and _type respectively when indexing.
+ */
+public class MetadataFromDocumentProcessor implements StreamsProcessor, Serializable {
+
+ public final static String STREAMS_ID = "MetadataFromDocumentProcessor";
+
+ private ObjectMapper mapper;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetadataFromDocumentProcessor.class);
+
+ public MetadataFromDocumentProcessor() {
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ Map<String, Object> metadata = entry.getMetadata();
+ if( metadata == null ) metadata = Maps.newHashMap();
+
+ String id = null;
+ String type = null;
+
+ Object document = entry.getDocument();
+ ObjectNode objectNode = null;
+ if( document instanceof String) {
+ try {
+ objectNode = mapper.readValue((String) document, ObjectNode.class);
+ } catch (IOException e) {
+ LOGGER.warn("Can't deserialize to determine metadata", e);
+ }
+ } else {
+ try {
+ objectNode = mapper.convertValue(document, ObjectNode.class);
+ } catch (Exception e) {
+ LOGGER.warn("Can't deserialize to determine metadata", e);
+ }
+ }
+ if( objectNode != null ) {
+ if (objectNode.has("id"))
+ id = objectNode.get("id").textValue();
+ if (objectNode.has("verb"))
+ type = objectNode.get("verb").textValue();
+ if (objectNode.has("objectType"))
+ type = objectNode.get("objectType").textValue();
+ }
+
+ if( !Strings.isNullOrEmpty(id) ) metadata.put("id", id);
+ if( !Strings.isNullOrEmpty(type) ) metadata.put("type", type);
+
+ entry.setId(id);
+ entry.setMetadata(metadata);
+
+ result.add(entry);
+
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
+ mapper.registerModule(new JsonOrgModule());
+ }
+
+ @Override
+ public void cleanUp() {
+ mapper = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
new file mode 100644
index 0000000..baf386a
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Sets;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestMetadataFromDocumentProcessor {
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class);
+
+ @Before
+ public void prepareTest() {
+
+ }
+
+ @Test
+ public void testSerializability() {
+ MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor();
+
+ MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) SerializationUtils.clone(processor);
+ }
+
+ @Test
+ public void testMetadataFromDocumentProcessor() throws Exception {
+
+ MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor();
+
+ processor.prepare(null);
+
+ InputStream testActivityFolderStream = TestMetadataFromDocumentProcessor.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ Set<ActivityObject> objects = Sets.newHashSet();
+
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = TestMetadataFromDocumentProcessor.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ activity.setId(activity.getVerb());
+ activity.getAdditionalProperties().remove("$license");
+
+ if( activity.getActor().getObjectType() != null)
+ objects.add(activity.getActor());
+ if( activity.getObject().getObjectType() != null)
+ objects.add(activity.getObject());
+
+ StreamsDatum datum = new StreamsDatum(activity);
+
+ List<StreamsDatum> resultList = processor.process(datum);
+ assert(resultList != null);
+ assert(resultList.size() == 1);
+
+ StreamsDatum result = resultList.get(0);
+ assert(result != null);
+ assert(result.getDocument() != null);
+ assert(result.getId() != null);
+ assert(result.getMetadata() != null);
+ assert(result.getMetadata().get("id") != null);
+ assert(result.getMetadata().get("type") != null);
+
+ LOGGER.info("valid: " + activity.getVerb() );
+ }
+
+ for( ActivityObject activityObject : objects) {
+ LOGGER.info("Object: " + MAPPER.writeValueAsString(activityObject));
+
+ activityObject.setId(activityObject.getObjectType());
+ StreamsDatum datum = new StreamsDatum(activityObject);
+
+ List<StreamsDatum> resultList = processor.process(datum);
+ assert(resultList != null);
+ assert(resultList.size() == 1);
+
+ StreamsDatum result = resultList.get(0);
+ assert(result != null);
+ assert(result.getDocument() != null);
+ assert(result.getId() != null);
+ assert(result.getMetadata() != null);
+ assert(result.getMetadata().get("id") != null);
+ assert(result.getMetadata().get("type") != null);
+
+ LOGGER.info("valid: " + activityObject.getObjectType() );
+ }
+ }
+}
|