Return-Path: X-Original-To: apmail-streams-commits-archive@minotaur.apache.org Delivered-To: apmail-streams-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90D0711295 for ; Thu, 11 Sep 2014 17:51:47 +0000 (UTC) Received: (qmail 31614 invoked by uid 500); 11 Sep 2014 17:51:47 -0000 Delivered-To: apmail-streams-commits-archive@streams.apache.org Received: (qmail 31594 invoked by uid 500); 11 Sep 2014 17:51:47 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 31585 invoked by uid 99); 11 Sep 2014 17:51:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2014 17:51:47 +0000 X-ASF-Spam-Status: No, hits=-2001.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 11 Sep 2014 17:51:45 +0000 Received: (qmail 30702 invoked by uid 99); 11 Sep 2014 17:51:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2014 17:51:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 43C109BD01D; Thu, 11 Sep 2014 17:51:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sblackmon@apache.org To: commits@streams.incubator.apache.org Message-Id: <1c013e98eea2402bb4294125e07fc4a6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Utility Processors to - populate datum from metadata - populate datum from metadata in document field Date: Thu, 11 Sep 2014 17:51:25 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-streams Updated Branches: refs/heads/STREAMS-170 [created] bc367b309 Utility Processors to - populate datum from metadata - populate datum from metadata in document field Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bc367b30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bc367b30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bc367b30 Branch: refs/heads/STREAMS-170 Commit: bc367b3096d685235de2f44ba47999e126d9a141 Parents: 35a8fbf Author: Steve Blackmon Authored: Thu Sep 11 12:51:22 2014 -0500 Committer: Steve Blackmon Committed: Thu Sep 11 12:51:22 2014 -0500 ---------------------------------------------------------------------- .../DatumFromMetadataAsDocumentProcessor.java | 128 +++++++++++++++++++ .../processor/DatumFromMetadataProcessor.java | 91 +++++++++++++ 2 files changed, 219 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc367b30/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java new file mode 100644 index 0000000..cfad87e --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -0,0 +1,128 @@ +package org.apache.streams.elasticsearch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Created by sblackmon on 9/4/14. + */ +public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable { + + public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; + + private ObjectMapper mapper; + + public DatumFromMetadataAsDocumentProcessor() { + Config config = StreamsConfigurator.config.getConfig("elasticsearch"); + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DatumFromMetadataAsDocumentProcessor(Config config) { + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) { + this.config = config; + } + + @Override + public List process(StreamsDatum entry) { + List result = Lists.newArrayList(); + + ObjectNode metadataObjectNode; + try { + metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class); + } catch (IOException e) { + return result; + } + + Map metadata = asMap(metadataObjectNode); + + if(entry == null || entry.getMetadata() == null) + return result; + + String index = (String) metadata.get("index"); + String type = (String) metadata.get("type"); + String id = (String) metadata.get("id"); + + if( index == null ) { + index = this.config.getIndexes().get(0); + } + if( type == null ) { + type = this.config.getTypes().get(0); + } + if( id == null ) { + id = entry.getId(); + } + + GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); + getRequestBuilder.setFields("*", "_timestamp"); + getRequestBuilder.setFetchSource(true); + GetResponse getResponse = getRequestBuilder.get(); + + if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true ) + return result; + + entry.setDocument(getResponse.getSource()); + if( getResponse.getField("_timestamp") != null) { + DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); + entry.setTimestamp(timestamp); + } + + result.add(entry); + + return result; + } + + @Override + public void prepare(Object configurationObject) { + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + mapper = StreamsJacksonMapper.getInstance(); + mapper.registerModule(new JsonOrgModule()); + } + + @Override + public void cleanUp() { + this.elasticsearchClientManager.getClient().close(); + } + + public Map asMap(JsonNode node) { + + Iterator> iterator = node.fields(); + Map ret = Maps.newHashMap(); + + Map.Entry entry; + + while (iterator.hasNext()) { + entry = iterator.next(); + if( entry.getValue().asText() != null ) + ret.put(entry.getKey(), entry.getValue().asText()); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc367b30/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java new file mode 100644 index 0000000..170749d --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java @@ -0,0 +1,91 @@ +package org.apache.streams.elasticsearch.processor; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.joda.time.DateTime; + +import java.io.Serializable; +import java.util.List; + +/** + * Created by sblackmon on 9/4/14. + */ +public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable { + + public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; + + public DatumFromMetadataProcessor() { + Config config = StreamsConfigurator.config.getConfig("elasticsearch"); + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DatumFromMetadataProcessor(Config config) { + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) { + this.config = config; + } + + @Override + public List process(StreamsDatum entry) { + List result = Lists.newArrayList(); + + if(entry == null || entry.getMetadata() == null) + return result; + + String index = (String) entry.getMetadata().get("index"); + String type = (String) entry.getMetadata().get("type"); + String id = (String) entry.getMetadata().get("id"); + + if( index == null ) { + index = this.config.getIndexes().get(0); + } + if( type == null ) { + type = this.config.getTypes().get(0); + } + if( id == null ) { + id = entry.getId(); + } + + GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); + getRequestBuilder.setFields("*", "_timestamp"); + getRequestBuilder.setFetchSource(true); + GetResponse getResponse = getRequestBuilder.get(); + + if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true ) + return result; + + entry.setDocument(getResponse.getSource()); + if( getResponse.getField("_timestamp") != null) { + DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); + entry.setTimestamp(timestamp); + } + + result.add(entry); + + return result; + } + + @Override + public void prepare(Object configurationObject) { + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + + } + + @Override + public void cleanUp() { + this.elasticsearchClientManager.getClient().close(); + } +}