streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject git commit: people pattern processor
Date Sun, 31 Aug 2014 17:07:55 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/pp [created] 2a8b105f5


people pattern processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2a8b105f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2a8b105f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2a8b105f

Branch: refs/heads/pp
Commit: 2a8b105f5b4953a619f1c64838dfb418aa05c06f
Parents: ee6b4d5
Author: Steve Blackmon <sblackmon@w2odigital.com>
Authored: Thu Aug 28 19:31:48 2014 -0500
Committer: Steve Blackmon <sblackmon@w2odigital.com>
Committed: Thu Aug 28 19:31:48 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../streams-processor-peoplepattern/pom.xml     | 138 +++++++++++++++
 .../peoplepattern/AccountTypeProcessor.java     | 176 +++++++++++++++++++
 .../peoplepattern/DemographicsProcessor.java    | 169 ++++++++++++++++++
 .../PeoplePatternConfigurator.java              |  52 ++++++
 .../streams/peoplepattern/AccountType.json      |  26 +++
 .../streams/peoplepattern/Demographics.json     |  60 +++++++
 .../PeoplePatternConfiguration.json             |  15 ++
 .../resources/templates/peoplepatternactor.json |  25 +++
 .../apache/streams/data/util/ActivityUtil.java  |  29 ++-
 10 files changed, 685 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 699274e..0230bd1 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -49,6 +49,7 @@
         <!--<module>streams-processor-tika</module>-->
         <module>streams-provider-instagram</module>
         <module>streams-processor-json</module>
+        <module>streams-processor-peoplepattern</module>
         <module>streams-processor-urls</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml
new file mode 100644
index 0000000..dab8410
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-peoplepattern</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.5</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+                <plugin>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>build-helper-maven-plugin</artifactId>
+                    <executions>
+                        <execution>
+                            <id>add-source</id>
+                            <phase>generate-sources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                        <execution>
+                            <id>add-source-jaxb2</id>
+                            <phase>generate-sources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>target/generated-sources/jaxb2</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.jsonschema2pojo</groupId>
+                    <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                    <configuration>
+                        <addCompileSourceRoot>true</addCompileSourceRoot>
+                        <generateBuilders>true</generateBuilders>
+                        <sourcePaths>
+                            <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath>
+                        </sourcePaths>
+                        <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                        <targetPackage>org.apache.streams.peoplepattern</targetPackage>
+                        <useLongIntegers>true</useLongIntegers>
+                        <useJodaDates>true</useJodaDates>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>generate</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
new file mode 100644
index 0000000..e412639
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class AccountTypeProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "AccountTypeProcessor";
+
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private PeoplePatternConfiguration peoplePatternConfiguration = null;
+
+    private String authHeader;
+
+    public AccountTypeProcessor() {
+        Config config = StreamsConfigurator.config.getConfig("peoplepattern");
+        peoplePatternConfiguration = PeoplePatternConfigurator.detectPeoplePatternConfiguration(config);
+        LOGGER.info("creating AccountTypeProcessor");
+    }
+
+    public AccountTypeProcessor(PeoplePatternConfiguration peoplePatternConfiguration) {
+        this.peoplePatternConfiguration = peoplePatternConfiguration;
+        LOGGER.info("creating AccountTypeProcessor");
+    }
+
+    String baseUrl = "https://people8accountv02.apiary-mock.com";
+
+    String endpoint = "/v0.2/accounttype/v0.2/accounttype";
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+
+        Actor actor = activity.getActor();
+
+        URI uri = null;
+
+        String username = (String) ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class)).get("screenName");
+
+        try {
+            uri = new URIBuilder()
+                    .setScheme("https")
+                    //.setHost("people8accountv02.apiary-mock.com")
+                    //.setPath("/v0.2/accounttype/v0.2/accounttype")
+                    .setHost("api.peoplepattern.com")
+                    .setPath("/v0.2/accounttype/")
+                    .setParameter("id", actor.getId())
+                    .setParameter("name", actor.getDisplayName())
+                    .setParameter("username", username)
+                    .setParameter("description", actor.getSummary())
+                    .build();
+        } catch (URISyntaxException e) {
+            LOGGER.error("URI error {}", uri.toString());
+            return result;
+        }
+
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", "application/json");
+        httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+        CloseableHttpClient httpclient = HttpClients.createDefault();
+
+        CloseableHttpResponse response = null;
+        try {
+            //response = httpclient.execute(httpget, context);
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle rate-limiting
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null)
{
+                String entityString = EntityUtils.toString(entity);
+
+                LOGGER.debug(entityString);
+
+                AccountType accountType = mapper.readValue(entityString, AccountType.class);
+
+                Map<String, Object> extensions = ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class));
+
+                extensions.put(EXTENSION, accountType);
+
+                actor.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+                LOGGER.debug("Actor: {}", actor);
+
+                activity.setActor(actor);
+
+                entry.setDocument(activity);
+
+                result.add(entry);
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error {} - {}", uri.toString(), response);
+            return result;
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+            try {
+                httpclient.close();
+            } catch (IOException e) {}
+        }
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        // TODO: one client object
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(peoplePatternConfiguration.getUsername());
+        stringBuilder.append(":");
+        stringBuilder.append(peoplePatternConfiguration.getPassword());
+        String string = stringBuilder.toString();
+        authHeader = Base64.encodeBase64String(string.getBytes());
+
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("shutting down AccountTypeProcessor");
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
new file mode 100644
index 0000000..bd7a1b2
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class DemographicsProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "DemographicsProcessor";
+
+    private final static String EXTENSION = "demographics";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private PeoplePatternConfiguration peoplePatternConfiguration = null;
+
+    private String authHeader;
+
+    public DemographicsProcessor() {
+        Config config = StreamsConfigurator.config.getConfig("peoplepattern");
+        peoplePatternConfiguration = PeoplePatternConfigurator.detectPeoplePatternConfiguration(config);
+        LOGGER.info("creating DemographicsProcessor");
+    }
+
+    public DemographicsProcessor(PeoplePatternConfiguration peoplePatternConfiguration) {
+        this.peoplePatternConfiguration = peoplePatternConfiguration;
+        LOGGER.info("creating DemographicsProcessor");
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+        Actor actor = activity.getActor();
+
+        String username = (String) ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class)).get("screenName");
+
+        URI uri = null;
+        try {
+            uri = new URIBuilder()
+                    .setScheme("https")
+                    //.setHost("people8demographicsv02.apiary-mock.com")
+                    //.setPath("/v0.2/demographics/v0.2/demographics")
+                    .setHost("api.peoplepattern.com")
+                    .setPath("/v0.2/demographics/")
+                    .setParameter("id", actor.getId())
+                    .setParameter("name", actor.getDisplayName())
+                    .setParameter("username", username)
+                    .setParameter("description", actor.getSummary())
+                    .build();
+        } catch (URISyntaxException e) {
+            LOGGER.error("URI error {}", uri.toString());
+            return result;
+        }
+
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", "application/json");
+        httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+        CloseableHttpClient httpclient = HttpClients.createDefault();
+
+        CloseableHttpResponse response = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle rate-limiting
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null)
{
+                String entityString = EntityUtils.toString(entity);
+
+                LOGGER.debug(entityString);
+
+                Demographics demographics = mapper.readValue(entityString, Demographics.class);
+
+                Map<String, Object> extensions = ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class));
+
+                extensions.put(EXTENSION, demographics);
+
+                actor.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+                LOGGER.debug("Actor: {}", actor);
+
+                activity.setActor(actor);
+
+                entry.setDocument(activity);
+
+                result.add(entry);
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error {} - {}", uri.toString(), response);
+            return result;
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+            try {
+                httpclient.close();
+            } catch (IOException e) {}
+        }
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        // TODO: one client object
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(peoplePatternConfiguration.getUsername());
+        stringBuilder.append(":");
+        stringBuilder.append(peoplePatternConfiguration.getPassword());
+        String string = stringBuilder.toString();
+        authHeader = Base64.encodeBase64String(string.getBytes());
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("shutting down DemographicsProcessor");
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
new file mode 100644
index 0000000..e531d17
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class PeoplePatternConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(PeoplePatternConfigurator.class);
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+
+    public static PeoplePatternConfiguration detectPeoplePatternConfiguration(Config config)
{
+
+        PeoplePatternConfiguration peoplePatternConfiguration = null;
+        try {
+            peoplePatternConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()),
PeoplePatternConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Preconditions.checkNotNull(peoplePatternConfiguration);
+
+        return peoplePatternConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
new file mode 100644
index 0000000..61ff110
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
@@ -0,0 +1,26 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType": "org.apache.streams.peoplepattern.AccountType",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "prediction" : {
+            "type" : "string",
+            "enum" : [
+                "person",
+                "organization",
+                "entertainment",
+                "adult",
+                "no-prediction"
+            ],
+            "default": "no-prediction"
+        },
+        "score": {
+            "type": "number"
+        },
+        "id": {
+            "type": "string"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
new file mode 100644
index 0000000..d1f64d8
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
@@ -0,0 +1,60 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType": "org.apache.streams.peoplepattern.Demographics",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "age": {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "integer",
+                    "default": 1990
+                },
+                "score": {
+                    "type": "number"
+                }
+
+            }
+        },
+        "gender" : {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "string",
+                    "enum": [
+                        "male",
+                        "female",
+                        "no-prediction"
+                    ],
+                    "default": "no-prediction"
+                },
+                "score": {
+                    "type": "number"
+                }
+            }
+        },
+        "race" : {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "string",
+                    "enum": [
+                        "black",
+                        "east-asian",
+                        "hispanic",
+                        "middle-eastern",
+                        "south-asian",
+                        "white",
+                        "no-prediction"
+                    ],
+                    "default": "no-prediction"
+                },
+                "score": {
+                    "type": "number"
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
new file mode 100644
index 0000000..c65f2dc
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
@@ -0,0 +1,15 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType": "org.apache.streams.peoplepattern.PeoplePatternConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "username" : {
+            "type" : "string"
+        },
+        "password": {
+            "type": "string"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
new file mode 100644
index 0000000..9a24c5c
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
@@ -0,0 +1,25 @@
+{
+	"order": 20,
+	"template": "*activity*",
+	"settings": {},
+	"mappings": {
+        "activity": {
+            "properties": {
+                "actor": {
+                    "properties": {
+                        "extensions": {
+                            "properties": {
+                                "account_type": {
+                                    "type": "nested"
+                                },
+                                "demographics": {
+                                    "type": "nested"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 3684b32..e0373f8 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -19,6 +19,7 @@
 package org.apache.streams.data.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
@@ -61,7 +62,7 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     /**
      * Creates a standard extension property
@@ -70,12 +71,28 @@ public class ActivityUtil {
      */
     @SuppressWarnings("unchecked")
     public static Map<String, Object> ensureExtensions(Activity activity) {
-        Map<String, Object> properties = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
-        if(properties == null) {
-            properties = new HashMap<String, Object>();
-            activity.setAdditionalProperty(EXTENSION_PROPERTY, properties);
+        Map<String, Object> extensions = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        if(extensions == null) {
+            extensions = new HashMap<String, Object>();
+            activity.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
         }
-        return properties;
+        return extensions;
+    }
+
+    /**
+     * Creates a standard extension property
+     * @param object objectnode to create the property in
+     * @return the Map representing the extensions property
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> ensureExtensions(ObjectNode object) {
+        ObjectNode extensions = (ObjectNode)object.get(EXTENSION_PROPERTY);
+        if(extensions == null) {
+            extensions =  mapper.createObjectNode();
+            object.put(EXTENSION_PROPERTY, extensions);
+        }
+        Map<String, Object> extensionsMap = mapper.convertValue(extensions, Map.class);
+        return extensionsMap;
     }
 
     /**


Mime
View raw message