streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/2] incubator-streams git commit: resolves STREAMS-329 #329
Date Fri, 19 Jun 2015 20:46:36 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master bea7a504c -> 19b10b13f


resolves STREAMS-329 #329


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3f6cb94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3f6cb94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3f6cb94

Branch: refs/heads/master
Commit: b3f6cb94d8a1f79300e90f7944a291ba9d351a5b
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Sun May 24 15:54:35 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Sun May 24 15:54:35 2015 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  27 ++++
 .../MetadataFromDocumentProcessor.java          | 112 ++++++++++++++++
 .../test/TestMetadataFromDocumentProcessor.java | 133 +++++++++++++++++++
 3 files changed, 272 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 8bc3f42..e830071 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -86,6 +86,14 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <scope>compile</scope>
@@ -132,6 +140,25 @@
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
new file mode 100644
index 0000000..7a9135d
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Examines document to derive metadata fields.
+ *
+ * This is useful if you have a document with a populated 'id', and 'verb' or 'objectType'
fields you want
+ * to use as _id and _type respectively when indexing.
+ */
+public class MetadataFromDocumentProcessor implements StreamsProcessor, Serializable {
+
+    public final static String STREAMS_ID = "MetadataFromDocumentProcessor";
+
+    private ObjectMapper mapper;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MetadataFromDocumentProcessor.class);
+
+    public MetadataFromDocumentProcessor() {
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        Map<String, Object> metadata = entry.getMetadata();
+        if( metadata == null ) metadata = Maps.newHashMap();
+
+        String id = null;
+        String type = null;
+
+        Object document = entry.getDocument();
+        ObjectNode objectNode = null;
+        if( document instanceof String) {
+            try {
+                objectNode = mapper.readValue((String) document, ObjectNode.class);
+            } catch (IOException e) {
+                LOGGER.warn("Can't deserialize to determine metadata", e);
+            }
+        } else {
+            try {
+                objectNode = mapper.convertValue(document, ObjectNode.class);
+            } catch (Exception e) {
+                LOGGER.warn("Can't deserialize to determine metadata", e);
+            }
+        }
+        if( objectNode != null ) {
+            if (objectNode.has("id"))
+                id = objectNode.get("id").textValue();
+            if (objectNode.has("verb"))
+                type = objectNode.get("verb").textValue();
+            if (objectNode.has("objectType"))
+                type = objectNode.get("objectType").textValue();
+        }
+
+        if( !Strings.isNullOrEmpty(id) ) metadata.put("id", id);
+        if( !Strings.isNullOrEmpty(type) ) metadata.put("type", type);
+
+        entry.setId(id);
+        entry.setMetadata(metadata);
+
+        result.add(entry);
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        mapper = StreamsJacksonMapper.getInstance();
+        mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+        mapper = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3f6cb94/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
new file mode 100644
index 0000000..baf386a
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Sets;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestMetadataFromDocumentProcessor {
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class);
+
+    @Before
+    public void prepareTest() {
+
+    }
+
+    @Test
+    public void testSerializability() {
+        MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor();
+
+        MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Test
+    public void testMetadataFromDocumentProcessor() throws Exception {
+
+        MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor();
+
+        processor.prepare(null);
+
+        InputStream testActivityFolderStream = TestMetadataFromDocumentProcessor.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        Set<ActivityObject> objects = Sets.newHashSet();
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = TestMetadataFromDocumentProcessor.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            activity.setId(activity.getVerb());
+            activity.getAdditionalProperties().remove("$license");
+
+            if( activity.getActor().getObjectType() != null)
+                objects.add(activity.getActor());
+            if( activity.getObject().getObjectType() != null)
+                objects.add(activity.getObject());
+
+            StreamsDatum datum = new StreamsDatum(activity);
+
+            List<StreamsDatum> resultList = processor.process(datum);
+            assert(resultList != null);
+            assert(resultList.size() == 1);
+
+            StreamsDatum result = resultList.get(0);
+            assert(result != null);
+            assert(result.getDocument() != null);
+            assert(result.getId() != null);
+            assert(result.getMetadata() != null);
+            assert(result.getMetadata().get("id") != null);
+            assert(result.getMetadata().get("type") != null);
+
+            LOGGER.info("valid: " + activity.getVerb() );
+        }
+
+        for( ActivityObject activityObject : objects) {
+            LOGGER.info("Object: " + MAPPER.writeValueAsString(activityObject));
+
+            activityObject.setId(activityObject.getObjectType());
+            StreamsDatum datum = new StreamsDatum(activityObject);
+
+            List<StreamsDatum> resultList = processor.process(datum);
+            assert(resultList != null);
+            assert(resultList.size() == 1);
+
+            StreamsDatum result = resultList.get(0);
+            assert(result != null);
+            assert(result.getDocument() != null);
+            assert(result.getId() != null);
+            assert(result.getMetadata() != null);
+            assert(result.getMetadata().get("id") != null);
+            assert(result.getMetadata().get("type") != null);
+
+            LOGGER.info("valid: " + activityObject.getObjectType() );
+        }
+    }
+}


Mime
View raw message