streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject svn commit: r1560191 - in /incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident: StreamsPersistWriterState.java StreamsProviderSpout.java
Date Tue, 21 Jan 2014 21:16:03 GMT
Author: sblackmon
Date: Tue Jan 21 21:16:03 2014
New Revision: 1560191

URL: http://svn.apache.org/r1560191
Log:
storm wrappers for providers and persistwriters

Added:
    incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
    incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java

Added: incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java?rev=1560191&view=auto
==============================================================================
--- incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
(added)
+++ incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
Tue Jan 21 21:16:03 2014
@@ -0,0 +1,107 @@
+package org.apache.streams.storm.trident;
+
+import backtype.storm.task.IMetricsContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 1/16/14.
+ */
+public class StreamsPersistWriterState implements State {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterState.class);
+
+    StreamsPersistWriter writer;
+    StreamsPersistStateController controller;
+
+    public StreamsPersistWriterState(StreamsPersistStateController controller) {
+        this.controller = new StreamsPersistStateController();
+        writer.start();
+    }
+
+    public void bulkMessages(List<TridentTuple> tuples) {
+        for (TridentTuple tuple : tuples) {
+            StreamsDatum entry = this.controller.fromTuple(tuple);
+            try {
+                writer.write(entry);
+            } catch (Exception e) {
+                LOGGER.error("Exception writing entry : {}", e, entry);
+            }
+        }
+        LOGGER.debug("******** Ending commit");
+    }
+
+    @Override
+    public void beginCommit(Long aLong) {
+
+    }
+
+    @Override
+    public void commit(Long aLong) {
+
+    }
+
+    public static class Factory implements StateFactory {
+
+        private Logger logger;
+        private StreamsPersistStateController controller;
+
+        public Factory(StreamsPersistWriter writer, StreamsPersistStateController controller)
{
+            this.controller = controller;
+            this.logger = LoggerFactory.getLogger(Factory.class);
+        }
+
+        @Override
+        public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
+            this.logger.debug("Called makeState. . . ");
+            // convert map to config object
+            return new StreamsPersistWriterState(controller);
+        }
+
+    }
+
+    public static class StreamsPersistStateController implements Serializable {
+
+        private String fieldName;
+        private ObjectMapper mapper = new ObjectMapper();
+
+        public StreamsPersistStateController() {
+            this.fieldName = "datum";
+        }
+
+        public StreamsPersistStateController(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        public StreamsDatum fromTuple(TridentTuple tuple) {
+            return mapper.convertValue(tuple.getValueByField(this.fieldName), StreamsDatum.class);
+        }
+
+    }
+
+
+
+    public static class StreamsPersistWriterSendMessage extends BaseStateUpdater<StreamsPersistWriterState>
{
+
+        private Logger logger = LoggerFactory.getLogger(StreamsPersistWriterSendMessage.class);
+
+        @Override
+        public void updateState(StreamsPersistWriterState writerState, List<TridentTuple>
tridentTuples, TridentCollector tridentCollector) {
+            this.logger.debug("****  calling send message. .  .");
+            writerState.bulkMessages(tridentTuples);
+        }
+    }
+}

Added: incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java?rev=1560191&view=auto
==============================================================================
--- incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
(added)
+++ incubator/streams/trunk/streams-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
Tue Jan 21 21:16:03 2014
@@ -0,0 +1,69 @@
+package org.apache.streams.storm.trident;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections4.IteratorUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 1/16/14.
+ */
+public class StreamsProviderSpout implements IBatchSpout {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderSpout.class);
+
+    StreamsProvider provider;
+
+    public StreamsProviderSpout(StreamsProvider provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext) {
+        provider.start();
+    }
+
+    @Override
+    public synchronized void emitBatch(long l, TridentCollector tridentCollector) {
+        List<StreamsDatum> batch;
+        batch = IteratorUtils.toList(provider.getProviderQueue().iterator());
+        for( StreamsDatum datum : batch ) {
+            tridentCollector.emit( Lists.newArrayList(
+                    datum.getTimestamp(),
+                    datum.getSequenceid(),
+                    datum.getDocument()
+            ));
+        }
+    }
+
+    @Override
+    public void ack(long l) {
+
+    }
+
+    @Override
+    public void close() {
+        provider.stop();
+    }
+
+    @Override
+    public Map getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("timestamp", "sequenceid", "datum");
+    }
+};
\ No newline at end of file



Mime
View raw message