Repository: incubator-streams
Updated Branches:
refs/heads/pp [created] 2a8b105f5
people pattern processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2a8b105f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2a8b105f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2a8b105f
Branch: refs/heads/pp
Commit: 2a8b105f5b4953a619f1c64838dfb418aa05c06f
Parents: ee6b4d5
Author: Steve Blackmon <sblackmon@w2odigital.com>
Authored: Thu Aug 28 19:31:48 2014 -0500
Committer: Steve Blackmon <sblackmon@w2odigital.com>
Committed: Thu Aug 28 19:31:48 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../streams-processor-peoplepattern/pom.xml | 138 +++++++++++++++
.../peoplepattern/AccountTypeProcessor.java | 176 +++++++++++++++++++
.../peoplepattern/DemographicsProcessor.java | 169 ++++++++++++++++++
.../PeoplePatternConfigurator.java | 52 ++++++
.../streams/peoplepattern/AccountType.json | 26 +++
.../streams/peoplepattern/Demographics.json | 60 +++++++
.../PeoplePatternConfiguration.json | 15 ++
.../resources/templates/peoplepatternactor.json | 25 +++
.../apache/streams/data/util/ActivityUtil.java | 29 ++-
10 files changed, 685 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 699274e..0230bd1 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -49,6 +49,7 @@
<!--<module>streams-processor-tika</module>-->
<module>streams-provider-instagram</module>
<module>streams-processor-json</module>
+ <module>streams-processor-peoplepattern</module>
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml
new file mode 100644
index 0000000..dab8410
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>streams-processor-peoplepattern</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.peoplepattern</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
new file mode 100644
index 0000000..e412639
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class AccountTypeProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "AccountTypeProcessor";
+
+ private final static String EXTENSION = "account_type";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private PeoplePatternConfiguration peoplePatternConfiguration = null;
+
+ private String authHeader;
+
+ public AccountTypeProcessor() {
+ Config config = StreamsConfigurator.config.getConfig("peoplepattern");
+ peoplePatternConfiguration = PeoplePatternConfigurator.detectPeoplePatternConfiguration(config);
+ LOGGER.info("creating AccountTypeProcessor");
+ }
+
+ public AccountTypeProcessor(PeoplePatternConfiguration peoplePatternConfiguration) {
+ this.peoplePatternConfiguration = peoplePatternConfiguration;
+ LOGGER.info("creating AccountTypeProcessor");
+ }
+
+ String baseUrl = "https://people8accountv02.apiary-mock.com";
+
+ String endpoint = "/v0.2/accounttype/v0.2/accounttype";
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+
+ Actor actor = activity.getActor();
+
+ URI uri = null;
+
+ String username = (String) ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class)).get("screenName");
+
+ try {
+ uri = new URIBuilder()
+ .setScheme("https")
+ //.setHost("people8accountv02.apiary-mock.com")
+ //.setPath("/v0.2/accounttype/v0.2/accounttype")
+ .setHost("api.peoplepattern.com")
+ .setPath("/v0.2/accounttype/")
+ .setParameter("id", actor.getId())
+ .setParameter("name", actor.getDisplayName())
+ .setParameter("username", username)
+ .setParameter("description", actor.getSummary())
+ .build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uri.toString());
+ return result;
+ }
+
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", "application/json");
+ httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ CloseableHttpResponse response = null;
+ try {
+ //response = httpclient.execute(httpget, context);
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle rate-limiting
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null)
{
+ String entityString = EntityUtils.toString(entity);
+
+ LOGGER.debug(entityString);
+
+ AccountType accountType = mapper.readValue(entityString, AccountType.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class));
+
+ extensions.put(EXTENSION, accountType);
+
+ actor.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+ LOGGER.debug("Actor: {}", actor);
+
+ activity.setActor(actor);
+
+ entry.setDocument(activity);
+
+ result.add(entry);
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error {} - {}", uri.toString(), response);
+ return result;
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ try {
+ httpclient.close();
+ } catch (IOException e) {}
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // TODO: one client object
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(peoplePatternConfiguration.getUsername());
+ stringBuilder.append(":");
+ stringBuilder.append(peoplePatternConfiguration.getPassword());
+ String string = stringBuilder.toString();
+ authHeader = Base64.encodeBase64String(string.getBytes());
+
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("shutting down AccountTypeProcessor");
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
new file mode 100644
index 0000000..bd7a1b2
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class DemographicsProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "DemographicsProcessor";
+
+ private final static String EXTENSION = "demographics";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private PeoplePatternConfiguration peoplePatternConfiguration = null;
+
+ private String authHeader;
+
+ public DemographicsProcessor() {
+ Config config = StreamsConfigurator.config.getConfig("peoplepattern");
+ peoplePatternConfiguration = PeoplePatternConfigurator.detectPeoplePatternConfiguration(config);
+ LOGGER.info("creating DemographicsProcessor");
+ }
+
+ public DemographicsProcessor(PeoplePatternConfiguration peoplePatternConfiguration) {
+ this.peoplePatternConfiguration = peoplePatternConfiguration;
+ LOGGER.info("creating DemographicsProcessor");
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+ Actor actor = activity.getActor();
+
+ String username = (String) ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class)).get("screenName");
+
+ URI uri = null;
+ try {
+ uri = new URIBuilder()
+ .setScheme("https")
+ //.setHost("people8demographicsv02.apiary-mock.com")
+ //.setPath("/v0.2/demographics/v0.2/demographics")
+ .setHost("api.peoplepattern.com")
+ .setPath("/v0.2/demographics/")
+ .setParameter("id", actor.getId())
+ .setParameter("name", actor.getDisplayName())
+ .setParameter("username", username)
+ .setParameter("description", actor.getSummary())
+ .build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uri.toString());
+ return result;
+ }
+
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", "application/json");
+ httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ CloseableHttpResponse response = null;
+ try {
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle rate-limiting
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null)
{
+ String entityString = EntityUtils.toString(entity);
+
+ LOGGER.debug(entityString);
+
+ Demographics demographics = mapper.readValue(entityString, Demographics.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(mapper.convertValue(actor,
ObjectNode.class));
+
+ extensions.put(EXTENSION, demographics);
+
+ actor.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+ LOGGER.debug("Actor: {}", actor);
+
+ activity.setActor(actor);
+
+ entry.setDocument(activity);
+
+ result.add(entry);
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error {} - {}", uri.toString(), response);
+ return result;
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ try {
+ httpclient.close();
+ } catch (IOException e) {}
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // TODO: one client object
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(peoplePatternConfiguration.getUsername());
+ stringBuilder.append(":");
+ stringBuilder.append(peoplePatternConfiguration.getPassword());
+ String string = stringBuilder.toString();
+ authHeader = Base64.encodeBase64String(string.getBytes());
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("shutting down DemographicsProcessor");
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
new file mode 100644
index 0000000..e531d17
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/PeoplePatternConfigurator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class PeoplePatternConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(PeoplePatternConfigurator.class);
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+
+ public static PeoplePatternConfiguration detectPeoplePatternConfiguration(Config config)
{
+
+ PeoplePatternConfiguration peoplePatternConfiguration = null;
+ try {
+ peoplePatternConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()),
PeoplePatternConfiguration.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Preconditions.checkNotNull(peoplePatternConfiguration);
+
+ return peoplePatternConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
new file mode 100644
index 0000000..61ff110
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
@@ -0,0 +1,26 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.AccountType",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "prediction" : {
+ "type" : "string",
+ "enum" : [
+ "person",
+ "organization",
+ "entertainment",
+ "adult",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ },
+ "id": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
new file mode 100644
index 0000000..d1f64d8
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
@@ -0,0 +1,60 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.Demographics",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "age": {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "integer",
+ "default": 1990
+ },
+ "score": {
+ "type": "number"
+ }
+
+ }
+ },
+ "gender" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "male",
+ "female",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ },
+ "race" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "black",
+ "east-asian",
+ "hispanic",
+ "middle-eastern",
+ "south-asian",
+ "white",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
new file mode 100644
index 0000000..c65f2dc
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/PeoplePatternConfiguration.json
@@ -0,0 +1,15 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.PeoplePatternConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "username" : {
+ "type" : "string"
+ },
+ "password": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
new file mode 100644
index 0000000..9a24c5c
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
@@ -0,0 +1,25 @@
+{
+ "order": 20,
+ "template": "*activity*",
+ "settings": {},
+ "mappings": {
+ "activity": {
+ "properties": {
+ "actor": {
+ "properties": {
+ "extensions": {
+ "properties": {
+ "account_type": {
+ "type": "nested"
+ },
+ "demographics": {
+ "type": "nested"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2a8b105f/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 3684b32..e0373f8 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -19,6 +19,7 @@
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
@@ -61,7 +62,7 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
/**
* Creates a standard extension property
@@ -70,12 +71,28 @@ public class ActivityUtil {
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> ensureExtensions(Activity activity) {
- Map<String, Object> properties = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
- if(properties == null) {
- properties = new HashMap<String, Object>();
- activity.setAdditionalProperty(EXTENSION_PROPERTY, properties);
+ Map<String, Object> extensions = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = new HashMap<String, Object>();
+ activity.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
}
- return properties;
+ return extensions;
+ }
+
+ /**
+ * Creates a standard extension property
+ * @param object objectnode to create the property in
+ * @return the Map representing the extensions property
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> ensureExtensions(ObjectNode object) {
+ ObjectNode extensions = (ObjectNode)object.get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = mapper.createObjectNode();
+ object.put(EXTENSION_PROPERTY, extensions);
+ }
+ Map<String, Object> extensionsMap = mapper.convertValue(extensions, Map.class);
+ return extensionsMap;
}
/**
|