streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/3] incubator-streams git commit: resolves STREAMS-333 #333 added parent support to EsUpdater more comprehensive testing
Date Fri, 19 Jun 2015 21:43:57 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master 0db84e4fb -> e7bf641d3


resolves STREAMS-333 #333
added parent support to EsUpdater
more comprehensive testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f0bf5937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f0bf5937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f0bf5937

Branch: refs/heads/master
Commit: f0bf5937a739847a299b05b0309d7b72d58268a0
Parents: 701165f
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Mon Jun 1 09:02:21 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Mon Jun 1 09:02:21 2015 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 .../streams-persist-elasticsearch/pom.xml       |  49 ++++-
 .../ElasticsearchPersistUpdater.java            |  18 +-
 .../ElasticsearchPersistWriter.java             |  26 ++-
 .../test/TestDatumFromMetadataProcessorIT.java  |  99 ++++++++++
 .../test/TestElasticsearchPersistWriter.java    |  88 ---------
 .../test/TestElasticsearchPersistWriterIT.java  | 197 +++++++++++++++++++
 ...ElasticsearchPersistWriterParentChildIT.java | 183 +++++++++++++++++
 .../resources/ActivityChildObjectParent.json    |  21 ++
 9 files changed, 581 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a67e0e0..077f8de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,6 +173,8 @@
         </release.repository.url>
 
         <!-- Plugin and Plugin Dependency Versions -->
+
+        <clean.plugin.version>2.6</clean.plugin.version>
         <compiler.plugin.version>3.0</compiler.plugin.version>
         <failsafe.plugin.version>2.17</failsafe.plugin.version>
         <surefire.plugin.version>2.17</surefire.plugin.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 8bc3f42..37d74d7 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -56,11 +56,18 @@
             <optional>true</optional>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <scope>test</scope>
         </dependency>
-       <dependency>
+        <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
             <version>${project.version}</version>
@@ -96,6 +103,10 @@
             <artifactId>json</artifactId>
             <version>${orgjson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+        </dependency>
     </dependencies>
     <dependencyManagement>
         <dependencies>
@@ -132,6 +143,42 @@
     <build>
         <plugins>
             <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>${clean.plugin.version}</version>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>src/site/resources</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                        <!-- this is here because elasticsearch integration tests don't
have a setting to change directory where temp index files get created -->
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 32aa0eb..e142908 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -19,6 +19,7 @@
 package org.apache.streams.elasticsearch;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.elasticsearch.action.update.UpdateRequest;
@@ -54,22 +55,22 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter
impl
         String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
         String type = ElasticsearchMetadataUtil.getType(metadata, config);
         String id = ElasticsearchMetadataUtil.getId(streamsDatum);
+        String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
 
-        String json;
         try {
 
-            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+            String docAsJson = docAsJson(streamsDatum.getDocument());
 
-            LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
+            LOGGER.debug("Attempt Update: ({},{},{},{}) {}", index, type, id, parent, docAsJson);
 
-            update(index, type, id, json);
+            update(index, type, id, parent, docAsJson);
 
         } catch (Throwable e) {
             LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
         }
     }
 
-    public void update(String indexName, String type, String id, String json) {
+    public void update(String indexName, String type, String id, String parent, String json)
{
         UpdateRequest updateRequest;
 
         Preconditions.checkNotNull(id);
@@ -82,8 +83,13 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter
impl
                 .id(id)
                 .doc(json);
 
+        if(!Strings.isNullOrEmpty(parent)) {
+            updateRequest = updateRequest.parent(parent);
+            updateRequest = updateRequest.routing(parent);
+        }
+
         // add fields
-        updateRequest.docAsUpsert(true);
+        //updateRequest.docAsUpsert(true);
 
         add(updateRequest);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c984a12..4b41bff 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -153,24 +153,34 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
         String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
 
         try {
+            streamsDatum = appendMetadata(streamsDatum);
+            String docAsJson = docAsJson(streamsDatum.getDocument());
             add(index, type, id, parent,
                     streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
-                    convertAndAppendMetadata(streamsDatum));
+                    docAsJson);
         } catch (Throwable e) {
             LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
         }
     }
 
-    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException
{
-        Object object = streamsDatum.getDocument();
+    protected String docAsJson(Object streamsDocument) throws IOException {
+
+        String docAsJson = (streamsDocument instanceof String) ? streamsDocument.toString()
: OBJECT_MAPPER.writeValueAsString(streamsDocument);
+
+        return docAsJson;
+    }
+
+    protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException {
+
+        String docAsJson = (streamsDatum.getDocument() instanceof String) ? streamsDatum.getDocument().toString()
: OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
-        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
         if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
-            return docAsJson;
+            return streamsDatum;
         else {
             ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
             node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
-            return OBJECT_MAPPER.writeValueAsString(node);
+            streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(node));
+            return streamsDatum;
         }
     }
 
@@ -494,8 +504,10 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
 
         // keep track of the number of totalFailed and items that we have totalOk.
         for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-            if (resp == null || resp.isFailed())
+            if (resp == null || resp.isFailed()) {
                 failed++;
+                LOGGER.debug("{} ({},{},{}) failed: {}", resp.getOpType(), resp.getIndex(),
resp.getType(), resp.getId(), resp.getFailureMessage());
+            }
             else
                 passed++;
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessorIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessorIT.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessorIT.java
new file mode 100644
index 0000000..f672b62
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessorIT.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST,
numNodes=1)
+public class TestDatumFromMetadataProcessorIT extends ElasticsearchIntegrationTest {
+
+    private final String TEST_INDEX = "TestDatumFromMetadataProcessor".toLowerCase();
+
+    private ElasticsearchReaderConfiguration testConfiguration;
+
+    @Test
+    public void testSerializability() {
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Before
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchReaderConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        client().index(client().prepareIndex(TEST_INDEX, "activity", "id").setSource(testJsonString).request()).actionGet(5,
TimeUnit.SECONDS);
+
+    }
+
+    @Test
+    public void testDatumFromMetadataProcessor() {
+
+        Map<String, Object> metadata = Maps.newHashMap();
+
+        metadata.put("index", TEST_INDEX);
+        metadata.put("type", "activity");
+        metadata.put("id", "id");
+
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        StreamsDatum testInput = new StreamsDatum(null);
+
+        testInput.setMetadata(metadata);
+
+        Assert.assertNull(testInput.document);
+
+        processor.prepare(null);
+
+        StreamsDatum testOutput = processor.process(testInput).get(0);
+
+        processor.cleanUp();
+
+        Assert.assertNotNull(testOutput.document);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
deleted file mode 100644
index e460a1c..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by sblackmon on 10/20/14.
- */
-@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST,
numNodes=1)
-public class TestElasticsearchPersistWriter extends ElasticsearchIntegrationTest {
-
-    private final String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
-
-    private ElasticsearchWriterConfiguration testConfiguration;
-
-    public void prepareTest() {
-
-        testConfiguration = new ElasticsearchWriterConfiguration();
-        testConfiguration.setHosts(Lists.newArrayList("localhost"));
-        testConfiguration.setClusterName(cluster().getClusterName());
-
-    }
-
-   @Test
-    public void testPersistWriterString() {
-
-        ElasticsearchWriterConfiguration testConfiguration = new ElasticsearchWriterConfiguration();
-        testConfiguration.setHosts(Lists.newArrayList("localhost"));
-        testConfiguration.setClusterName(cluster().getClusterName());
-        testConfiguration.setBatchSize(1l);
-        testConfiguration.setIndex(TEST_INDEX);
-        testConfiguration.setType("string");
-        ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
-        testPersistWriter.prepare(null);
-
-        String testJsonString = "{\"dummy\":\"true\"}";
-
-        assert(!indexExists(TEST_INDEX));
-
-        testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
-
-        testPersistWriter.cleanUp();
-
-        flushAndRefresh();
-
-        assert(indexExists(TEST_INDEX));
-
-        long count = client().count(client().prepareCount().request()).actionGet().getCount();
-
-        assert(count > 0);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterIT.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterIT.java
new file mode 100644
index 0000000..944843e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.*;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST,
numNodes=1)
+public class TestElasticsearchPersistWriterIT extends ElasticsearchIntegrationTest {
+
+    protected String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestElasticsearchPersistWriterIT.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchWriterConfiguration testConfiguration;
+
+    @Before
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+        testConfiguration.setIndex("writer");
+        testConfiguration.setType("activity");
+
+    }
+
+    @Test
+    public void testPersist() throws Exception {
+        testPersistWriter();
+        testPersistUpdater();
+    }
+
+    void testPersistWriter() throws Exception {
+
+       assert(!indexExists(TEST_INDEX));
+
+       ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
+       testPersistWriter.prepare(null);
+
+       InputStream testActivityFolderStream = TestElasticsearchPersistWriterIT.class.getClassLoader()
+               .getResourceAsStream("activities");
+       List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+       for( String file : files) {
+           LOGGER.info("File: " + file );
+           InputStream testActivityFileStream = TestElasticsearchPersistWriterIT.class.getClassLoader()
+                   .getResourceAsStream("activities/" + file);
+           Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+           StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+           testPersistWriter.write( datum );
+           LOGGER.info("Wrote: " + activity.getVerb() );
+       }
+
+       testPersistWriter.cleanUp();
+
+       flushAndRefresh();
+
+       long count = client().count(client().prepareCount().request()).actionGet().getCount();
+
+       assert(count == 89);
+
+    }
+
+    void testPersistUpdater() throws Exception {
+
+        long count = client().count(client().prepareCount().request()).actionGet().getCount();
+
+        ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration);
+        testPersistUpdater.prepare(null);
+
+        InputStream testActivityFolderStream = TestElasticsearchPersistWriterIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = TestElasticsearchPersistWriterIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            Activity update = new Activity();
+            update.setAdditionalProperty("updated", Boolean.TRUE);
+            update.setAdditionalProperty("str", "str");
+            update.setAdditionalProperty("long", 10l);
+            update.setActor(
+                    new Actor()
+                    .withAdditionalProperty("updated", Boolean.TRUE)
+                    .withAdditionalProperty("double", 10d)
+                    .withAdditionalProperty("map",
+                            MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item"))));
+
+            StreamsDatum datum = new StreamsDatum(update, activity.getVerb());
+            testPersistUpdater.write( datum );
+            LOGGER.info("Updated: " + activity.getVerb() );
+        }
+
+        testPersistUpdater.cleanUp();
+
+        flushAndRefresh();
+
+        long updated = client().prepareCount().setQuery(
+                QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
+                        FilterBuilders.existsFilter("updated")
+                )
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("updated: {}", updated);
+
+        assertEquals(count, updated);
+
+        long actorupdated = client().prepareCount().setQuery(
+                QueryBuilders.termQuery("actor.updated", true)
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("actor.updated: {}", actorupdated);
+
+        assertEquals(count, actorupdated);
+
+        long strupdated = client().prepareCount().setQuery(
+                QueryBuilders.termQuery("str", "str")
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("strupdated: {}", strupdated);
+
+        assertEquals(count, strupdated);
+
+        long longupdated = client().prepareCount().setQuery(
+                QueryBuilders.rangeQuery("long").from(9).to(11)
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("longupdated: {}", longupdated);
+
+        assertEquals(count, longupdated);
+
+        long doubleupdated = client().prepareCount().setQuery(
+                QueryBuilders.rangeQuery("long").from(9).to(11)
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("doubleupdated: {}", doubleupdated);
+
+        assertEquals(count, doubleupdated);
+
+        long mapfieldupdated = client().prepareCount().setQuery(
+                QueryBuilders.termQuery("actor.map.field", "item")
+        ).execute().actionGet().getCount();
+
+        LOGGER.info("mapfieldupdated: {}", mapfieldupdated);
+
+        assertEquals(count, mapfieldupdated);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterParentChildIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterParentChildIT.java
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterParentChildIT.java
new file mode 100644
index 0000000..e8996ec
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriterParentChildIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.test;
+
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.base.Strings;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.queryparser.xml.builders.TermQueryBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST,
numNodes=1)
+public class TestElasticsearchPersistWriterParentChildIT extends ElasticsearchIntegrationTest
{
+
+    protected String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestElasticsearchPersistWriterParentChildIT.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchWriterConfiguration testConfiguration;
+
+    Set<Class<? extends ActivityObject>> objectTypes;
+
+    List<String> files;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+        testConfiguration.setIndex("activity");
+        testConfiguration.setBatchSize(5l);
+
+        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = client().admin().indices().preparePutTemplate("mappings");
+        URL templateURL = TestElasticsearchPersistWriterParentChildIT.class.getResource("/ActivityChildObjectParent.json");
+        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+        String templateSource = MAPPER.writeValueAsString(template);
+        putTemplateRequestBuilder.setSource(templateSource);
+
+        client().admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                .setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json"))
+                .setScanners(new SubTypesScanner()));
+        objectTypes = reflections.getSubTypesOf(ActivityObject.class);
+
+        InputStream testActivityFolderStream = TestElasticsearchPersistWriterParentChildIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    }
+
+    @Test
+    public void testPersist() throws Exception {
+        testPersistWriter();
+        testPersistUpdater();
+    }
+
+    void testPersistWriter() throws Exception {
+
+        assert(!indexExists(TEST_INDEX));
+
+        testConfiguration.setIndex("activity");
+        testConfiguration.setBatchSize(5l);
+
+        ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
+        testPersistWriter.prepare(null);
+
+        for( Class objectType : objectTypes ) {
+            Object object = objectType.newInstance();
+            ActivityObject activityObject = MAPPER.convertValue(object, ActivityObject.class);
+            StreamsDatum datum = new StreamsDatum(activityObject, activityObject.getObjectType());
+            datum.getMetadata().put("type", "object");
+            testPersistWriter.write( datum );
+        }
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = TestElasticsearchPersistWriterParentChildIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
+                datum.getMetadata().put("parent", activity.getObject().getObjectType());
+                datum.getMetadata().put("type", "activity");
+                testPersistWriter.write(datum);
+                LOGGER.info("Wrote: " + activity.getVerb());
+            }
+        }
+
+        testPersistWriter.cleanUp();
+
+        flushAndRefresh();
+
+        long parent_count = client().count(client().prepareCount().setTypes("object").request()).actionGet().getCount();
+
+        assertEquals(41, parent_count);
+
+        long child_count = client().count(client().prepareCount().setTypes("activity").request()).actionGet().getCount();
+
+        assertEquals(84, child_count);
+
+    }
+
+    void testPersistUpdater() throws Exception {
+
+        ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration);
+        testPersistUpdater.prepare(null);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = TestElasticsearchPersistWriterParentChildIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+            activity.setAdditionalProperty("updated", Boolean.TRUE);
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            if( !Strings.isNullOrEmpty(activity.getObject().getObjectType())) {
+                datum.getMetadata().put("parent", activity.getObject().getObjectType());
+                datum.getMetadata().put("type", "activity");
+                testPersistUpdater.write(datum);
+                LOGGER.info("Updated: " + activity.getVerb() );
+            }
+        }
+
+        testPersistUpdater.cleanUp();
+
+        flushAndRefresh();
+
+        long child_count = client().count(client().prepareCount().setQuery(QueryBuilders.termQuery("updated",
"true")).setTypes("activity").request()).actionGet().getCount();
+
+        assertEquals(84, child_count);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0bf5937/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
new file mode 100644
index 0000000..bb8bbae
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/resources/ActivityChildObjectParent.json
@@ -0,0 +1,21 @@
+{
+    "$license": [
+      "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "template": "*",
+    "order": 100,
+    "mappings": {
+        "object": {
+            "dynamic": true
+        },
+        "activity": {
+            "_parent": {
+              "type": "object"
+            },
+            "routing": {
+                "required": true
+            },
+            "dynamic": true
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message