streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/2] incubator-streams git commit: resolves STREAMS-284. tested locally
Date Fri, 27 Feb 2015 21:45:57 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master ac6841f3f -> 78a49ee76


resolves STREAMS-284.  tested locally


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7538a872
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7538a872
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7538a872

Branch: refs/heads/master
Commit: 7538a872166187d95a3616d61daf9a75c7267533
Parents: 18d14f0
Author: sblackmon <sblackmon@apache.org>
Authored: Tue Feb 17 12:50:39 2015 -0600
Committer: sblackmon <sblackmon@apache.org>
Committed: Tue Feb 17 12:50:39 2015 -0600

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchMetadataUtil.java        | 12 ++++++++++++
 .../elasticsearch/ElasticsearchPersistReader.java       |  3 +++
 .../elasticsearch/ElasticsearchPersistWriter.java       | 10 +++++++++-
 3 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7538a872/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
index c16c8d4..6e85f22 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
@@ -100,6 +100,18 @@ public class ElasticsearchMetadataUtil {
         return id;
     }
 
+    public static String getParent(StreamsDatum datum) {
+
+        String parent = null;
+
+        Map<String, Object> metadata = datum.getMetadata();
+
+        if( parent == null && metadata != null && metadata.containsKey("parent"))
+            parent = (String) datum.getMetadata().get("parent");
+
+        return parent;
+    }
+
     public static String getId(Map<String, Object> metadata) {
 
         return (String) metadata.get("id");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7538a872/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 5411094..3a2f8fb 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -202,6 +202,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader,
Seriali
                         DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue());
                         item.setTimestamp(timestamp);
                     }
+                    if( hit.fields().containsKey("_parent")) {
+                        item.getMetadata().put("parent", hit.fields().get("_parent").value());
+                    }
                     reader.write(item);
                 } catch (IOException e) {
                     LOGGER.warn("Unable to process json source: ", hit.getSourceAsString());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7538a872/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 2f7dc5c..3df99f9 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -148,9 +148,10 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
         String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
         String type = ElasticsearchMetadataUtil.getType(metadata, config);
         String id = ElasticsearchMetadataUtil.getId(streamsDatum);
+        String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
 
         try {
-            add(index, type, id,
+            add(index, type, id, parent,
                     streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
                     convertAndAppendMetadata(streamsDatum));
         } catch (Throwable e) {
@@ -301,6 +302,10 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
     }
 
     public void add(String indexName, String type, String id, String ts, String json) {
+        add(indexName, type, id, null, ts, json);
+    }
+
+    public void add(String indexName, String type, String id, String parent, String ts, String
json) {
 
         // make sure that these are not null
         Preconditions.checkNotNull(indexName);
@@ -318,6 +323,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
         if(ts != null)
             indexRequestBuilder.setTimestamp(ts);
 
+        if(parent != null)
+            indexRequestBuilder.setParent(parent);
+
         add(indexRequestBuilder.request());
     }
 


Mime
View raw message