streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/4] incubator-streams git commit: implements streams-persist-graph neo4j only writer and reader
Date Wed, 01 Apr 2015 14:54:27 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master beaa8dc1d -> 27f38335c


implements streams-persist-graph
neo4j only
writer and reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/752b2db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/752b2db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/752b2db8

Branch: refs/heads/master
Commit: 752b2db86e4bb14184702582c5cef43f52211345
Parents: 5082690
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Fri Mar 27 06:16:15 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Fri Mar 27 06:16:15 2015 -0500

----------------------------------------------------------------------
 .../http/provider/SimpleHTTPGetProvider.java    | 174 +---------
 .../http/provider/SimpleHttpProvider.java       | 324 +++++++++++++++++++
 .../components/http/HttpConfiguration.json      |  13 +-
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-persist-graph/README.md |  57 ++++
 streams-contrib/streams-persist-graph/pom.xml   | 113 +++++++
 .../org/apache/streams/graph/GraphHelper.java   |  39 +++
 .../streams/graph/GraphPersistWriter.java       | 203 ++++++++++++
 .../apache/streams/graph/GraphVertexReader.java | 110 +++++++
 .../streams/graph/neo4j/CypherGraphHelper.java  | 180 +++++++++++
 .../streams/graph/GraphConfiguration.json       |  22 ++
 .../graph/GraphEdgeWriterConfiguration.json     |  33 ++
 .../streams/graph/GraphReaderConfiguration.json |  20 ++
 .../graph/GraphVertexWriterConfiguration.json   |  33 ++
 .../streams/graph/GraphWriterConfiguration.json |  21 ++
 .../graph/neo4j/CypherQueryResponse.json        |  43 +++
 .../graph/test/TestNeo4jVertexReader.java       |  84 +++++
 .../src/test/resources/sampleReaderResult.json  | 197 +++++++++++
 .../apache/streams/data/util/PropertyUtil.java  | 123 +++++++
 19 files changed, 1616 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
index 118d06b..fae01cc 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -53,178 +53,12 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * Provider retrieves contents from an known set of urls and passes all resulting objects downstream
+ * @Deprecated
+ * Replaced by SimpleHttpProvider, which can use GET or POST
  */
-public class SimpleHTTPGetProvider implements StreamsProvider {
+@Deprecated
+public class SimpleHTTPGetProvider extends SimpleHttpProvider {
 
     private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
 
-    // from root config id
-    private final static String EXTENSION = "account_type";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProvider.class);
-
-    protected ObjectMapper mapper;
-
-    protected URIBuilder uriBuilder;
-
-    protected CloseableHttpClient httpclient;
-
-    protected HttpProviderConfiguration configuration;
-
-    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    //    // authorized only
-//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
-//    //private String authHeader;
-//
-    public SimpleHTTPGetProvider() {
-        this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
-    }
-
-    public SimpleHTTPGetProvider(HttpProviderConfiguration providerConfiguration) {
-        LOGGER.info("creating SimpleHTTPGetProvider");
-        LOGGER.info(providerConfiguration.toString());
-        this.configuration = providerConfiguration;
-    }
-
-    /**
-      Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
-    }
-
-    public HttpGet prepareHttpGet(URI uri) {
-        HttpGet httpget = new HttpGet(uri);
-        httpget.addHeader("content-type", this.configuration.getContentType());
-        return httpget;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        mapper = StreamsJacksonMapper.getInstance();
-
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPath(this.configuration.getResourcePath());
-
-        httpclient = HttpClients.createDefault();
-    }
-
-    @Override
-    public void cleanUp() {
-
-        LOGGER.info("shutting down SimpleHTTPGetProvider");
-        try {
-            httpclient.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            try {
-                httpclient.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            } finally {
-                httpclient = null;
-            }
-        }
-    }
-
-    @Override
-    public void startStream() {
-
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        StreamsResultSet current;
-
-        uriBuilder = uriBuilder.setPath(
-            Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix())
-        );
-
-        URI uri;
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            uri = null;
-        }
-
-        List<ObjectNode> results = executeGet(uri);
-
-        lock.writeLock().lock();
-
-        for( ObjectNode item : results ) {
-            providerQueue.add(new StreamsDatum(item, item.get("id").asText(), new DateTime(item.get("timestamp").asText())));
-        }
-
-        LOGGER.debug("Creating new result set for {} items", providerQueue.size());
-        current = new StreamsResultSet(providerQueue);
-
-        return current;
-    }
-
-    protected List<ObjectNode> executeGet(URI uri) {
-
-        Preconditions.checkNotNull(uri);
-
-        List<ObjectNode> results = new ArrayList<>();
-
-        HttpGet httpget = prepareHttpGet(uri);
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpget);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
-                entityString = EntityUtils.toString(entity);
-                if( !entityString.equals("{}") && !entityString.equals("[]") ) {
-                    JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class);
-                    if (jsonNode != null && jsonNode instanceof ObjectNode ) {
-
-                        results.add((ObjectNode) jsonNode);
-                    } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
-                        ArrayNode arrayNode = (ArrayNode) jsonNode;
-                        Iterator<JsonNode> iterator = arrayNode.elements();
-                        while (iterator.hasNext()) {
-                            ObjectNode element = (ObjectNode) iterator.next();
-
-                            results.add(element);
-                        }
-                    }
-                }
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
-        } finally {
-            try {
-                response.close();
-            } catch (IOException e) {}
-        }
-        return results;
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
new file mode 100644
index 0000000..fc00db9
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http.provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Provider retrieves contents from an known set of urls and passes all resulting objects downstream
+ */
+public class SimpleHttpProvider implements StreamsProvider {
+
+    private final static String STREAMS_ID = "SimpleHttpProvider";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHttpProvider.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProviderConfiguration configuration;
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private ExecutorService executor;
+
+    public SimpleHttpProvider() {
+        this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
+    }
+
+    public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) {
+        LOGGER.info("creating SimpleHttpProvider");
+        LOGGER.info(providerConfiguration.toString());
+        this.configuration = providerConfiguration;
+    }
+
+    /**
+      Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+    public HttpRequestBase prepareHttpRequest(URI uri) {
+        HttpRequestBase request;
+        if( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.GET)) {
+            request = new HttpGet(uri);
+        } else if( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.POST)) {
+            request = new HttpPost(uri);
+        } else {
+            // this shouldn't happen because of the default
+            request = new HttpGet(uri);
+        }
+
+        request.addHeader("content-type", this.configuration.getContentType());
+
+        return request;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPort(this.configuration.getPort().intValue())
+            .setPath(this.configuration.getResourcePath());
+
+        SSLContextBuilder builder = new SSLContextBuilder();
+        SSLConnectionSocketFactory sslsf = null;
+        try {
+            builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+            sslsf = new SSLConnectionSocketFactory(
+                    builder.build(), SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+        } catch (NoSuchAlgorithmException e) {
+            LOGGER.warn(e.getMessage());
+        } catch (KeyManagementException e) {
+            LOGGER.warn(e.getMessage());
+        } catch (KeyStoreException e) {
+            LOGGER.warn(e.getMessage());
+        }
+
+        httpclient = HttpClients.custom().setSSLSocketFactory(
+                sslsf).build();
+
+        executor = Executors.newSingleThreadExecutor();
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("shutting down SimpleHttpProvider");
+        this.shutdownAndAwaitTermination(executor);
+        try {
+            httpclient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                httpclient = null;
+            }
+        }
+    }
+
+    @Override
+    public void startStream() {
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+
+                readCurrent();
+
+                Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+
+            }
+        });
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        StreamsResultSet current;
+
+        uriBuilder = uriBuilder.setPath(
+            Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix())
+        );
+
+        URI uri;
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            uri = null;
+        }
+
+        List<ObjectNode> results = execute(uri);
+
+        lock.writeLock().lock();
+
+        for( ObjectNode item : results ) {
+            providerQueue.add(newDatum(item));
+        }
+
+        LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+        current = new StreamsResultSet(providerQueue);
+
+        return current;
+    }
+
+    protected List<ObjectNode> execute(URI uri) {
+
+        Preconditions.checkNotNull(uri);
+
+        List<ObjectNode> results = new ArrayList<>();
+
+        HttpRequestBase httpRequest = prepareHttpRequest(uri);
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpRequest);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle retry
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+                if( !entityString.equals("{}") && !entityString.equals("[]") ) {
+                    JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class);
+                    results = parse(jsonNode);
+                }
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+        }
+        return results;
+    }
+
+    /**
+     Override this to change how entity gets converted to objects
+     */
+    protected List<ObjectNode> parse(JsonNode jsonNode) {
+
+        List<ObjectNode> results = new ArrayList<>();
+
+        if (jsonNode != null && jsonNode instanceof ObjectNode ) {
+            results.add((ObjectNode) jsonNode);
+        } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            Iterator<JsonNode> iterator = arrayNode.elements();
+            while (iterator.hasNext()) {
+                ObjectNode element = (ObjectNode) iterator.next();
+
+                results.add(element);
+            }
+        }
+
+        return results;
+    }
+
+    /**
+     Override this to change how metadata is derived from object
+     */
+    protected StreamsDatum newDatum(ObjectNode item) {
+        return new StreamsDatum(item, item.get("id").asText(), new DateTime(item.get("timestamp").asText()));
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return true;
+    }
+
+    protected void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    LOGGER.error("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
index 7c5006c..4d1fbdc 100644
--- a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
@@ -1,10 +1,7 @@
 {
+    "type": "object",
     "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
     "id": "#",
-    "type": "object",
     "javaType" : "org.apache.streams.components.http.HttpConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
@@ -28,6 +25,14 @@
             "description": "Resource Path",
             "required" : true
         },
+        "requestMethod": {
+            "type": "string",
+            "enum": [
+                "GET",
+                "POST"
+            ],
+            "default": "GET"
+        },
         "content-type": {
             "type": "string",
             "description": "Resource content-type",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index c52a4df..4827dd2 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,6 +43,7 @@
         <module>streams-persist-filebuffer</module>
         <module>streams-persist-hbase</module>
         <module>streams-persist-hdfs</module>
+        <module>streams-persist-graph</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
         <module>streams-amazon-aws</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/README.md b/streams-contrib/streams-persist-graph/README.md
new file mode 100644
index 0000000..476d02e
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/README.md
@@ -0,0 +1,57 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+streams-persist-graph
+=====================
+
+Build a graph index of a stream
+
+Example Neo4J writer configuration:
+
+    {
+        "graph": {
+            "type": "neo4j",
+            "protocol": "http",
+            "hostname": "localhost",
+            "port": 7474,
+            "graph": "data"
+            "vertices": {
+                "objects": [
+                    "actor",
+                    "object"
+                ],
+                "verbs": [
+                    "follow"
+                ],
+                "objectTypes": [
+                    "page"
+                ]
+            }
+        },
+    }
+
+Example Neo4J reader configuration:
+
+    {
+        "graph": {
+            "type": "neo4j",
+            "protocol": "http",
+            "hostname": "localhost",
+            "port": 7474,
+            "graph": "data"
+            "query": {
+                "objects": [
+                    "actor",
+                    "object"
+                ],
+                "verbs": [
+                    "follow"
+                ],
+                "objectTypes": [
+                    "page"
+                ]
+            }
+        },
+    }
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
new file mode 100644
index 0000000..2306cd4
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.2-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-graph</artifactId>
+    <name>streams-persist-graph</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.graph.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
new file mode 100644
index 0000000..8964bae
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+/**
+ * Interface for methods allowing persistance to a graph database wrapped with
+ * a rest API.  CypherGraphHelper is a good example, for neo4j.
+ */
+public interface GraphHelper {
+
+    public ObjectNode getVertexRequest(String id);
+
+    public ObjectNode createVertexRequest(ActivityObject activityObject);
+
+    public ObjectNode mergeVertexRequest(ActivityObject activityObject);
+
+    public ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
new file mode 100644
index 0000000..988ae4a
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.neo4j.CypherGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database with
+ * an http rest endpoint (such as neo4j)
+ */
+public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
+
+    public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    protected GraphWriterConfiguration configuration;
+
+    protected GraphHelper graphHelper;
+
+    private static ObjectMapper mapper;
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public GraphPersistWriter() {
+        this(new ComponentConfigurator<GraphWriterConfiguration>(GraphWriterConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    }
+
+    public GraphPersistWriter(GraphWriterConfiguration configuration) {
+        super(mapper.convertValue(configuration, HttpPersistWriterConfiguration.class));
+        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
+            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+        }
+        else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER)) {
+            super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+        }
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected ObjectNode preparePayload(StreamsDatum entry) {
+
+        Activity activity = null;
+
+        if (entry.getDocument() instanceof Activity) {
+            activity = (Activity) entry.getDocument();
+        } else if (entry.getDocument() instanceof ObjectNode) {
+            activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        } else if (entry.getDocument() instanceof String) {
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+            }
+        }
+
+        Preconditions.checkNotNull(activity);
+
+        ObjectNode request = mapper.createObjectNode();
+        ArrayNode statements = mapper.createArrayNode();
+
+        activity.getActor().setObjectType("page");
+
+        // always add vertices first
+        // what types of verbs are relevant for adding vertices?
+        if( configuration.getVertices().getVerbs().contains(activity.getVerb())) {
+
+            // what objects and objectTypes are relevant for adding vertices?
+            if( configuration.getVertices().getObjects().contains("actor") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) {
+                statements.add(graphHelper.mergeVertexRequest(activity.getActor()));
+            }
+            if( configuration.getVertices().getObjects().contains("object") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) {
+                statements.add(graphHelper.mergeVertexRequest(activity.getObject()));
+            }
+            if( configuration.getVertices().getObjects().contains("provider") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) {
+                statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
+            }
+            if( configuration.getVertices().getObjects().contains("target") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+                statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
+            }
+
+        }
+
+        // what types of verbs are relevant for adding edges?
+        if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
+
+            // what objects and objectTypes are relevant for adding edges?
+            if( configuration.getEdges().getObjects().contains("actor") &&
+                configuration.getEdges().getObjects().contains("object") &&
+                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+                configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) {
+                statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getObject()));
+            }
+            if( configuration.getEdges().getObjects().contains("actor") &&
+                    configuration.getEdges().getObjects().contains("target") &&
+                    configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+                    configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+                statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getTarget()));
+            }
+            if( configuration.getEdges().getObjects().contains("provider") &&
+                configuration.getEdges().getObjects().contains("actor") &&
+                configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) &&
+                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) {
+                statements.add(graphHelper.createEdgeRequest(activity, activity.getProvider(), activity.getActor()));
+            }
+        }
+
+        request.put("statements", statements);
+        return request;
+
+    }
+
+    @Override
+    protected ObjectNode executePost(HttpPost httpPost) {
+
+        Preconditions.checkNotNull(httpPost);
+
+        ObjectNode result = null;
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpPost);
+            HttpEntity entity = response.getEntity();
+            if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+                result = mapper.readValue(entityString, ObjectNode.class);
+            }
+            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+        } finally {
+            try {
+                if( response != null) response.close();
+            } catch (IOException e) {}
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        super.prepare(configurationObject);
+        mapper = StreamsJacksonMapper.getInstance();
+
+        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
+            graphHelper = new CypherGraphHelper();
+        }
+
+        Preconditions.checkNotNull(graphHelper);
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("exiting");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
new file mode 100644
index 0000000..5910136
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.components.http.provider.SimpleHttpProvider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.neo4j.CypherQueryResponse;
+import org.apache.streams.graph.neo4j.ItemData;
+import org.apache.streams.graph.neo4j.ItemMetadata;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Reads a stream of activityobjects from vertices in a graph database with
+ * an http rest endpoint (such as neo4j)
+ */
+public class GraphVertexReader extends SimpleHttpProvider implements StreamsPersistReader {
+
+    public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class);
+
+    protected GraphReaderConfiguration configuration;
+
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public GraphVertexReader() {
+        this(new ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    }
+
+    public GraphVertexReader(GraphReaderConfiguration configuration) {
+        super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
+        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
+            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+        else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER))
+            super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+        this.configuration = configuration;
+    }
+
+    /*
+     * Neo API query returns something like this:
+     * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] }
+     *
+     */
+    public List<ObjectNode> parse(JsonNode jsonNode) {
+        List<ObjectNode> results = Lists.newArrayList();
+
+        ObjectNode root = (ObjectNode) jsonNode;
+
+        CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class);
+
+        for( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) {
+
+            for (List<ItemMetadata> itemMetadatas : dataWrapper) {
+
+                for (ItemMetadata itemMetadata : itemMetadatas) {
+
+                    ItemData itemData = itemMetadata.getData();
+
+                    LOGGER.debug("itemData: " + itemData);
+
+                    results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
+                }
+
+            }
+
+        }
+        return results;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        super.prepare(configurationObject);
+
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
new file mode 100644
index 0000000..1af6f56
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public final static String statementKey = "statement";
+    public final static String paramsKey = "parameters";
+    public final static String propsKey = "props";
+
+    public final static String getVertexStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+
+    public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+                                                                "CREATE UNIQUE (n:<type> { props }) "+
+                                                                "RETURN n";
+
+    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
+                                                               "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "RETURN v";
+
+    public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
+                                                            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+                                                            "RETURN r";
+
+    public ObjectNode getVertexRequest(String id) {
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST getVertex = new ST(getVertexStatementTemplate);
+        getVertex.add("id", id);
+        request.put(statementKey, getVertex.render());
+
+        return request;
+    }
+
+    public ObjectNode createVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST createVertex = new ST(createVertexStatementTemplate);
+        createVertex.add("id", activityObject.getId());
+        createVertex.add("type", activityObject.getObjectType());
+        request.put(statementKey, createVertex.render());
+
+        ObjectNode params = mapper.createObjectNode();
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.');
+        params.put(propsKey, props);
+        request.put(paramsKey, params);
+
+        return request;
+    }
+
+    public ObjectNode mergeVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST mergeVertex = new ST(mergeVertexStatementTemplate);
+        mergeVertex.add("id", activityObject.getId());
+        mergeVertex.add("type", activityObject.getObjectType());
+
+        ObjectNode params = mapper.createObjectNode();
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.');
+        params.put(propsKey, props);
+        request.put(paramsKey, params);
+
+        String statement = mergeVertex.render();
+
+        request.put(statementKey, statement);
+
+        return request;
+    }
+
+    public ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) {
+
+        ObjectNode request = mapper.createObjectNode();
+
+        // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
+        activity.setActor(null);
+        activity.setObject(null);
+        activity.setTarget(null);
+        activity.getAdditionalProperties().put("extensions", null);
+
+        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+        ST mergeEdge = new ST(createEdgeStatementTemplate);
+        mergeEdge.add("s_id", source.getId());
+        mergeEdge.add("s_type", source.getObjectType());
+        mergeEdge.add("d_id", destination.getId());
+        mergeEdge.add("d_type", destination.getObjectType());
+        mergeEdge.add("r_id", activity.getId());
+        mergeEdge.add("r_type", activity.getVerb());
+        mergeEdge.add("r_props", getPropertyCreater(props));
+
+        String statement = mergeEdge.render();
+        request.put(statementKey, statement);
+
+        return request;
+    }
+
+    public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyCreater(Map<String, Object> map) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{");
+        List<String> parts = Lists.newArrayList();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String) (entry.getValue());
+                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
+            }
+        }
+        builder.append(Joiner.on(",").join(parts));
+        builder.append("}");
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
new file mode 100644
index 0000000..de92443
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
@@ -0,0 +1,22 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphConfiguration",
+    "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "type": {
+            "type": "string",
+            "description": "Graph DB type",
+            "enum" : ["neo4j", "rexster"]
+        },
+        "graph": {
+            "type": "string",
+            "description": "Graph DB Graph ID"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
new file mode 100644
index 0000000..f14f52c
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
@@ -0,0 +1,33 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "objects": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "verbs": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "objectTypes": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
new file mode 100644
index 0000000..ca588f3
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
@@ -0,0 +1,20 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphReaderConfiguration",
+    "extends" : {"$ref":"GraphConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "query": {
+            "type": "string",
+            "required": "true"
+        },
+        "params": {
+            "type": "object"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
new file mode 100644
index 0000000..66fc236
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
@@ -0,0 +1,33 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "objects": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "verbs": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "objectTypes": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
new file mode 100644
index 0000000..0c7e304
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
@@ -0,0 +1,21 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphWriterConfiguration",
+    "extends" : {"$ref":"GraphConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "vertices": {
+            "type": "object",
+            "javaType": "org.apache.streams.graph.GraphVertexWriterConfiguration"
+        },
+        "edges": {
+            "type": "object",
+            "javaType": "org.apache.streams.graph.GraphEdgeWriterConfiguration"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
new file mode 100644
index 0000000..77c6fd7
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
@@ -0,0 +1,43 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.neo4j.CypherQueryResponse",
+    "properties": {
+        "columns": {
+            "type": "array",
+            "id": "http://jsonschema.net/columns",
+            "required": false,
+            "items": {
+                "type": "string",
+                "id": "http://jsonschema.net/columns/0",
+                "required": false
+            }
+        },
+        "data": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "array",
+                "required": false,
+                "items": {
+                    "type": "array",
+                    "required": false,
+                    "items": {
+                        "type": "object",
+                        "javaType" : "org.apache.streams.graph.neo4j.ItemMetadata",
+                        "properties": {
+                            "data": {
+                                "type": "object",
+                                "javaType" : "org.apache.streams.graph.neo4j.ItemData"
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
new file mode 100644
index 0000000..cbe24f7
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.graph.GraphConfiguration;
+import org.apache.streams.graph.GraphVertexReader;
+import org.apache.streams.graph.GraphReaderConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Unit test for
+ * @see {@link org.apache.streams.graph.GraphVertexReader}
+ *
+ * Test that graph db responses can be converted to streams data
+ */
+public class TestNeo4jVertexReader {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestNeo4jVertexReader.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private JsonNode sampleReaderResult;
+
+    private GraphReaderConfiguration testConfiguration;
+
+    private GraphVertexReader graphPersistReader;
+
+    @Before
+    public void prepareTest() throws IOException {
+
+        testConfiguration = new GraphReaderConfiguration();
+        testConfiguration.setType(GraphConfiguration.Type.NEO_4_J);
+
+        URL url = this.getClass().getResource("/sampleReaderResult.json");
+
+        graphPersistReader = new GraphVertexReader(testConfiguration);
+        File sampleFile = new File(url.getFile());
+        String sampleText = FileUtils.readFileToString(sampleFile, "utf-8");
+        sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
+
+    }
+
+    @Test
+    public void testParseNeoResult() throws IOException {
+
+        List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
+
+        assert( result.size() == 10);
+
+        assert( result.get(0).get("summary").asText().startsWith("Building"));
+
+        assert( result.get(1).get("extensions").size() == 5);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-contrib/streams-persist-graph/src/test/resources/sampleReaderResult.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/resources/sampleReaderResult.json b/streams-contrib/streams-persist-graph/src/test/resources/sampleReaderResult.json
new file mode 100644
index 0000000..8b34ec9
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/resources/sampleReaderResult.json
@@ -0,0 +1,197 @@
+{
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "columns": [
+        "v"
+    ],
+    "data": [
+        [
+            {
+                "data": {
+                    "summary": "Building products at RelateIQ - http://www.relateiq.com",
+                    "extensions.screenName": "dpatil",
+                    "extensions.location": "Silicon Valley",
+                    "extensions.posts": 10134,
+                    "displayName": "dj patil",
+                    "handle": "dpatil",
+                    "extensions.favorites": 5468,
+                    "image.url": "https://pbs.twimg.com/profile_images/2827560897/e76b7577f7d9545362b9fb65c9db3bb3_normal.png",
+                    "url": "http://www.linkedin.com/in/dpatil",
+                    "objectType": "page",
+                    "@timestamp": 1417557531005,
+                    "id": "id:twitter:14839109",
+                    "extensions.followers": 20062
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "I am pretty awesome. Check out this simple guide to my tweets: http://bit.ly/r1GnK // former CTO @ Obama for America  // founder of Modest",
+                    "extensions.screenName": "harper",
+                    "extensions.location": "Chicago",
+                    "extensions.posts": 44728,
+                    "displayName": "harper",
+                    "handle": "harper",
+                    "extensions.favorites": 7759,
+                    "image.url": "https://pbs.twimg.com/profile_images/539089638669819905/7F_DiiQE_normal.jpeg",
+                    "url": "http://harperreed.com",
+                    "objectType": "page",
+                    "@timestamp": 1417555923917,
+                    "id": "id:twitter:1497",
+                    "extensions.followers": 28551
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "CTO at @sustranatalks & @last2left, owner of @structuredquery.  Interested in IA, UX, sustainability, equality, travel, cities, semtech, books...",
+                    "extensions.screenName": "ronmichael",
+                    "extensions.location": "Philadelphia",
+                    "extensions.posts": 12725,
+                    "displayName": "Ron M Zettlemoyer",
+                    "handle": "ronmichael",
+                    "extensions.favorites": 782,
+                    "image.url": "https://pbs.twimg.com/profile_images/533457797024063489/NXLJqfhm_normal.jpeg",
+                    "url": "http://zettlemoyer.com",
+                    "objectType": "page",
+                    "@timestamp": 1417558717717,
+                    "id": "id:twitter:807851",
+                    "extensions.followers": 457
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "Creating intelligent systems to automate tasks & improve decisions. Entrepreneur, ex Principal Data Scientist @LinkedIn. Machine Learning, Product, Networks",
+                    "extensions.screenName": "peteskomoroch",
+                    "extensions.location": "San Francisco",
+                    "extensions.posts": 14847,
+                    "displayName": "Peter Skomoroch",
+                    "handle": "peteskomoroch",
+                    "extensions.favorites": 7168,
+                    "image.url": "https://pbs.twimg.com/profile_images/3276454686/8f8493dfc040e56ef7ff8f59f9474774_normal.jpeg",
+                    "url": "http://datawrangling.com",
+                    "objectType": "page",
+                    "@timestamp": 1417566210626,
+                    "id": "id:twitter:14344469",
+                    "extensions.followers": 20349
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "TV Business Expert as seen on @BBC, Channel4 & more, CEO&Founder, Legacy Leaver, #Investor #Traveller, ?AUTHOR OF #1 BESTSELLING BOOK http://t.co/mexaUgNcwk?",
+                    "extensions.screenName": "marshawright",
+                    "extensions.location": "London|Spain|USA|Asia|GLOBAL",
+                    "extensions.posts": 82955,
+                    "displayName": "Real Marsha Wright",
+                    "handle": "marshawright",
+                    "extensions.favorites": 2882,
+                    "image.url": "https://pbs.twimg.com/profile_images/526068997440020480/rXNQhagV_normal.jpeg",
+                    "url": "http://www.MarshaWright.com",
+                    "objectType": "page",
+                    "@timestamp": 1417566667586,
+                    "id": "id:twitter:61608747",
+                    "extensions.followers": 120109
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "Married to @crystale, dad to @CCEleven & @CircaLuna, investor in @Twitter, @Uber, @Instagram, @kickstarter & other startups. Learn more: http://t.co/nJWB60dIUL.",
+                    "extensions.screenName": "sacca",
+                    "extensions.location": "",
+                    "extensions.posts": 34941,
+                    "displayName": "Chris Sacca",
+                    "handle": "sacca",
+                    "extensions.favorites": 87987,
+                    "image.url": "https://pbs.twimg.com/profile_images/1771648774/Sacca_profile_normal.jpg",
+                    "url": "http://lowercasellc.com/proprietor/",
+                    "objectType": "page",
+                    "@timestamp": 1417559518689,
+                    "id": "id:twitter:586",
+                    "extensions.followers": 1503801
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "extensions.screenName": "AustinTXDaily",
+                    "@timestamp": 1417561360464,
+                    "extensions.location": "Austin, TX",
+                    "extensions.posts": 27400,
+                    "displayName": "Austin Daily",
+                    "handle": "AustinTXDaily",
+                    "extensions.favorites": 0,
+                    "id": "id:twitter:200145841",
+                    "image.url": "https://pbs.twimg.com/profile_images/1175142789/Austin__TX_normal.jpg",
+                    "extensions.followers": 16695,
+                    "objectType": "page"
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "Leading Social Strategy for @WCGWorld. Hustler. Lakers. Niners. US Marine. Author.",
+                    "extensions.screenName": "Britopian",
+                    "extensions.location": "Silicon Valley, Ca.",
+                    "extensions.posts": 50558,
+                    "displayName": "Michael Brito",
+                    "handle": "Britopian",
+                    "extensions.favorites": 2408,
+                    "image.url": "https://pbs.twimg.com/profile_images/378800000388239413/59a5f995014ddf639b4827b6e3f403eb_normal.jpeg",
+                    "url": "http://linkd.in/britopian",
+                    "objectType": "page",
+                    "@timestamp": 1417564730230,
+                    "id": "id:twitter:844631",
+                    "extensions.followers": 115395
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "Real journalism for the Internet age",
+                    "extensions.screenName": "dailydot",
+                    "extensions.location": "Online/IRL",
+                    "extensions.posts": 97369,
+                    "displayName": "The Daily Dot",
+                    "handle": "dailydot",
+                    "extensions.favorites": 13677,
+                    "image.url": "https://pbs.twimg.com/profile_images/481571907888615425/R2TXfCgx_normal.png",
+                    "url": "http://dailydot.com/",
+                    "objectType": "page",
+                    "@timestamp": 1417566787760,
+                    "id": "id:twitter:211620426",
+                    "extensions.followers": 210024
+                }
+            }
+        ],
+        [
+            {
+                "data": {
+                    "summary": "Music Lover, Global Citizen, Digital Native, #FutureofWork, #Vegan, @PwCAdvisory on loan to @PwC_UK, Former @IBMSoftware Exec, OPINIONS=OWN",
+                    "extensions.screenName": "katmandelstein",
+                    "extensions.location": "Austinite Abroad in London",
+                    "extensions.posts": 32414,
+                    "displayName": "Kat Mandelstein",
+                    "handle": "katmandelstein",
+                    "extensions.favorites": 1305,
+                    "image.url": "https://pbs.twimg.com/profile_images/378800000674683761/e57c9b3033a246e9b7ef09fea6fedee2_normal.jpeg",
+                    "url": "http://about.me/katmandelstein",
+                    "objectType": "page",
+                    "@timestamp": 1417554091482,
+                    "id": "id:twitter:14529289",
+                    "extensions.followers": 6997
+                }}
+        ]
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/752b2db8/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
new file mode 100644
index 0000000..398069b
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  Class transforms nested properties of activities, actors, objects, etc...
+ */
+public class PropertyUtil {
+
+    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public static Map<String, Object> flattenToMap(ObjectNode object) {
+        Map<String, Object> flatObject = Maps.newHashMap();
+        addKeys(new String(), object, flatObject, '.');
+        return flatObject;
+    }
+
+    public static ObjectNode flattenToObjectNode(ObjectNode object) {
+        Map<String, Object> flatObject = flattenToMap(object, '.');
+        addKeys(new String(), object, flatObject, '.');
+        return mapper.convertValue(flatObject, ObjectNode.class);
+    }
+
+    public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) {
+        Map<String, Object> flatObject = Maps.newHashMap();
+        addKeys(new String(), object, flatObject, seperator);
+        return flatObject;
+    }
+
+    public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) {
+        Map<String, Object> flatObject = flattenToMap(object, seperator);
+        addKeys(new String(), object, flatObject, seperator);
+        return mapper.convertValue(flatObject, ObjectNode.class);
+    }
+
+    private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) {
+        if (jsonNode.isObject()) {
+            ObjectNode objectNode = (ObjectNode) jsonNode;
+            Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
+            String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
+
+            while (iter.hasNext()) {
+                Map.Entry<String, JsonNode> entry = iter.next();
+                addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
+            }
+        } else if (jsonNode.isArray()) {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            map.put(currentPath, arrayNode);
+        } else if (jsonNode.isValueNode()) {
+            ValueNode valueNode = (ValueNode) jsonNode;
+            if( valueNode.isTextual() )
+                map.put(currentPath, valueNode.asText());
+            else if ( valueNode.isNumber() )
+                map.put(currentPath, valueNode);
+        }
+    }
+
+    public static ObjectNode unflattenMap(Map<String, Object> object, char seperator) {
+        return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), seperator);
+    }
+
+    public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperator) {
+        ObjectNode root = mapper.createObjectNode();
+        Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields();
+        while (iter.hasNext()) {
+            Map.Entry<String, JsonNode> item = iter.next();
+            String fullKey = item.getKey();
+            if( !fullKey.contains(Character.valueOf(seperator).toString())) {
+                root.put(item.getKey(), item.getValue());
+            } else {
+                ObjectNode currentNode = root;
+                List<String> keyParts = Splitter.on(seperator).splitToList(item.getKey());
+                Iterator<String> keyPartIterator = Iterables.limit(Splitter.on(seperator).split(item.getKey()), keyParts.size()-1).iterator();
+                while( keyPartIterator.hasNext()) {
+                    String part = keyPartIterator.next();
+                    if( currentNode.has(part) && currentNode.get(part).isObject() ) {
+                        currentNode = (ObjectNode) currentNode.get(part);
+                    } else {
+                        ObjectNode newNode = mapper.createObjectNode();
+                        currentNode.put(part, newNode);
+                        currentNode = newNode;
+                    }
+                };
+                currentNode.put(keyParts.get(keyParts.size()-1), item.getValue());
+
+            }
+        }
+        return root;
+    }
+
+
+}


Mime
View raw message