streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject svn commit: r1557268 - in /incubator/streams/branches/sblackmon: streams-contrib/ streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/ streams-co...
Date Fri, 10 Jan 2014 22:03:10 GMT
Author: sblackmon
Date: Fri Jan 10 22:03:09 2014
New Revision: 1557268

URL: http://svn.apache.org/r1557268
Log:
small updates to core and persist implementations

Modified:
    incubator/streams/branches/sblackmon/streams-contrib/pom.xml
    incubator/streams/branches/sblackmon/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
    incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java

Modified: incubator/streams/branches/sblackmon/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/pom.xml?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/sblackmon/streams-contrib/pom.xml Fri Jan 10 22:03:09 2014
@@ -36,7 +36,7 @@
     </properties>
 
     <modules>
-        <!--<module>streams-persist-console</module>-->
+        <module>streams-persist-console</module>
         <module>streams-persist-kafka</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
Fri Jan 10 22:03:09 2014
@@ -31,21 +31,24 @@ public class ConsolePersistWriterTask im
 
     private ConsolePersistWriter writer;
 
-    public ConsolePersistWriterTask(ConsolePersistWriter writer,
-                                    BlockingQueue<Object> outqueue) {
+    public ConsolePersistWriterTask(ConsolePersistWriter writer) {
         this.writer = writer;
     }
 
     @Override
     public void run() {
         while(true) {
+            if( writer.getPersistQueue().peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.getPersistQueue().remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
             try {
-                StreamsDatum entry = writer.persistQueue.remove();
-                writer.write(entry);
                 Thread.sleep(new Random().nextInt(100));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            } catch (InterruptedException e) {}
         }
     }
 

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
Fri Jan 10 22:03:09 2014
@@ -28,7 +28,7 @@ public class KafkaPersistWriter implemen
 
     private KafkaConfiguration config;
 
-    private Producer<String, StreamsDatum> producer;
+    private Producer<String, String> producer;
 
     public KafkaPersistWriter() {
         Config config = StreamsConfigurator.config.getConfig("kafka");
@@ -63,7 +63,7 @@ public class KafkaPersistWriter implemen
 
         ProducerConfig config = new ProducerConfig(props);
 
-        producer = new Producer<String, StreamsDatum>(config);
+        producer = new Producer<String, String>(config);
     }
 
     @Override
@@ -72,6 +72,16 @@ public class KafkaPersistWriter implemen
     }
 
     @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return this.persistQueue;
+    }
+
+    @Override
     public void write(StreamsDatum entry) {
 
         try {
@@ -79,7 +89,7 @@ public class KafkaPersistWriter implemen
 
             String hash = GuidUtils.generateGuid(text);
 
-            KeyedMessage<String, StreamsDatum> data = new KeyedMessage<String, StreamsDatum>(config.getTopic(),
hash, entry);
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(),
hash, text);
 
             producer.send(data);
 

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
Fri Jan 10 22:03:09 2014
@@ -9,9 +9,13 @@
             "type": "string",
             "description": "A comma-delimited list of broker nodes"
         },
+        "zkconnect": {
+            "type": "string",
+            "description": "A comma-delimited list of zookeeper host:ports"
+        },
         "topic": {
             "type": "string",
-            "description": "A topic to write to by default"
+            "description": "A topic to read/write from"
         }
     }
 }
\ No newline at end of file

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
Fri Jan 10 22:03:09 2014
@@ -1,6 +1,7 @@
 package org.apache.streams.data.moreover;
 
 import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.moreover.MoreoverConfiguration;
@@ -19,7 +20,9 @@ import java.util.concurrent.*;
 public class MoreoverProvider implements StreamsProvider {
 
     private static Logger logger = LoggerFactory.getLogger(MoreoverProvider.class);
-    private volatile Queue<StreamsResultSet> resultQueue = new ConcurrentLinkedQueue<StreamsResultSet>();
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
     private List<ExecutorService> tasks = new LinkedList<ExecutorService>();
     private List<MoreoverKeyData> keys;
     private boolean started = false;
@@ -42,7 +45,7 @@ public class MoreoverProvider implements
         if(!started) {
             logger.trace("Producer not started.  Initializing");
             for(MoreoverKeyData key : keys) {
-                MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(),
this.resultQueue, key.getStartingSequence());
+                MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(),
this.providerQueue, key.getStartingSequence());
                 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
                 service.scheduleWithFixedDelay(task, 0, MoreoverProviderTask.LATENCY, TimeUnit.SECONDS);
                 logger.info("Started producer for {} with service {}", key.getKey(), service.toString());
@@ -60,8 +63,13 @@ public class MoreoverProvider implements
     }
 
     @Override
+    public Queue<StreamsDatum> getProviderQueue() {
+        return providerQueue;
+    }
+
+    @Override
     public StreamsResultSet readCurrent() {
-        return resultQueue.peek();
+        return null;
     }
 
     @Override

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
Fri Jan 10 22:03:09 2014
@@ -1,5 +1,8 @@
 package org.apache.streams.data.moreover;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,11 +21,11 @@ public class MoreoverProviderTask implem
     private final String lastSequence;
     private final String apiKey;
     private final String apiId;
-    private final Queue<StreamsResultSet> results;
+    private final Queue<StreamsDatum> results;
     private final MoreoverClient moClient;
     private boolean started = false;
 
-    public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsResultSet>
results, String lastSequence) {
+    public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results,
String lastSequence) {
         //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(),
apiId, apiKey, lastSequence);
         this.apiId = apiId;
         this.apiKey = apiKey;
@@ -38,7 +41,8 @@ public class MoreoverProviderTask implem
             ensureTime(moClient);
             MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence,
500);
             started = true;
-            results.offer(new MoreoverResultSetWrapper(result));
+            for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
+                results.offer(entry);
             logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
         } catch (Exception e) {
             logger.error("Exception while polling moreover", e);

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
Fri Jan 10 22:03:09 2014
@@ -10,6 +10,7 @@ import com.fasterxml.jackson.dataformat.
 import com.fasterxml.jackson.dataformat.xml.XmlFactory;
 import com.fasterxml.jackson.dataformat.xml.XmlMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.moreover.api.Article;
 import org.apache.streams.core.StreamsDatum;
 import org.slf4j.Logger;
@@ -19,9 +20,11 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.List;
 
 
 public class MoreoverResult implements Iterable<StreamsDatum> {
+
     private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
 
     private ObjectMapper mapper;
@@ -36,6 +39,7 @@ public class MoreoverResult implements I
     private String clientId;
     private BigInteger maxSequencedId = BigInteger.ZERO;
 
+    private List<StreamsDatum> list = Lists.newArrayList();
 
     protected MoreoverResult(String clientId, String xmlString, long start, long end) {
         this.xmlString = xmlString;
@@ -98,10 +102,11 @@ public class MoreoverResult implements I
 
         for (JsonNode articleNode : ImmutableList.copyOf(articlesArray.elements())) {
             Article article = mapper.convertValue(articleNode, Article.class);
-            BigInteger temp = new BigInteger(article.getSequenceId());
-            logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId,
temp);
-            if (temp.compareTo(this.maxSequencedId) > 0) {
-                this.maxSequencedId = temp;
+            BigInteger sequenceid = new BigInteger(article.getSequenceId());
+            list.add(new StreamsDatum(article, sequenceid));
+            logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId,
sequenceid);
+            if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+                this.maxSequencedId = sequenceid;
                 logger.debug("New max sequence Id {}", this.maxSequencedId);
             }
 
@@ -118,11 +123,8 @@ public class MoreoverResult implements I
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public Iterator<StreamsDatum> iterator() {
-        // TODO: I don't understand the purpose of this class.  destroy
-        //return new JsonStringIterator(articlesArray.iterator());
-        return null;
+        return list.iterator();
     }
 
     protected static class JsonStringIterator implements Iterator<Serializable> {

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
Fri Jan 10 22:03:09 2014
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonPa
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -15,6 +16,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 
@@ -28,7 +31,7 @@ public class TwitterEventProcessor imple
     private ObjectMapper mapper = new ObjectMapper();
 
     private BlockingQueue<String> inQueue;
-    private BlockingQueue<Object> outQueue;
+    private Queue<StreamsDatum> outQueue;
 
     private Class inClass;
     private Class outClass;
@@ -39,14 +42,14 @@ public class TwitterEventProcessor imple
 
     public final static String TERMINATE = new String("TERMINATE");
 
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, BlockingQueue<Object>
outQueue, Class inClass, Class outClass) {
+    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum>
outQueue, Class inClass, Class outClass) {
         this.inQueue = inQueue;
         this.outQueue = outQueue;
         this.inClass = inClass;
         this.outClass = outClass;
     }
 
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, BlockingQueue<Object>
outQueue, Class outClass) {
+    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum>
outQueue, Class outClass) {
         this.inQueue = inQueue;
         this.outQueue = outQueue;
         this.outClass = outClass;
@@ -72,13 +75,13 @@ public class TwitterEventProcessor imple
 
                 // if the target is string, just pass-through
                 if( java.lang.String.class.equals(outClass))
-                    outQueue.offer(item);
+                    outQueue.offer(new StreamsDatum(item));
                 else {
                     // convert to desired format
                     Object out = convert(node, inClass, outClass);
 
                     if( out != null && validate(out, outClass))
-                        outQueue.offer(out);
+                        outQueue.offer(new StreamsDatum(out));
                 }
 
             } catch (Exception e) {

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
(original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
Fri Jan 10 22:03:09 2014
@@ -16,6 +16,7 @@ import com.twitter.hbc.httpclient.auth.A
 import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
@@ -26,12 +27,13 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterStreamProvider /*extends BaseRichSpout*/ implements StreamsProvider,
Serializable, Runnable {
+public class TwitterStreamProvider /*extends BaseRichSpout*/ implements StreamsProvider,
Serializable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
@@ -47,18 +49,18 @@ public class TwitterStreamProvider /*ext
         this.config = config;
     }
 
-    BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
 
-    private StreamingEndpoint endpoint;
-    private BasicClient client;
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
 
-    public BlockingQueue<Object> getOutQueue() {
-        return outQueue;
-    }
+    protected StreamingEndpoint endpoint;
+    protected BasicClient client;
 
-    BlockingQueue<Object> outQueue = new LinkedBlockingQueue<Object>(10000);
+    public BlockingQueue<Object> getInQueue() {
+        return inQueue;
+    }
 
-    private ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5,
20));
+    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5,
20));
 
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize)
{
         return new ThreadPoolExecutor(nThreads, nThreads,
@@ -86,7 +88,8 @@ public class TwitterStreamProvider /*ext
         this.klass = klass;
     }
 
-    private void setup() {
+    @Override
+    public void start() {
 
         Preconditions.checkNotNull(this.klass);
 
@@ -123,29 +126,23 @@ public class TwitterStreamProvider /*ext
                 .processor(new StringDelimitedProcessor(inQueue))
                 .build();
 
-    }
-
-    @Override
-    public void run() {
-
-        setup();
-
         for (int i = 0; i < 10; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, outQueue, klass));
+            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
         }
 
-        client.connect();
-
+        new Thread(new TwitterStreamProviderTask(this)).start();
     }
 
     @Override
-    public void start() {
-
+    public void stop() {
+        for (int i = 0; i < 10; i++) {
+            inQueue.add(TwitterEventProcessor.TERMINATE);
+        }
     }
 
     @Override
-    public void stop() {
-
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
     }
 
     @Override

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
(original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
Fri Jan 10 22:03:09 2014
@@ -27,7 +27,29 @@ import java.util.Map;
 /**
  * Created by sblackmon on 1/2/14.
  */
-public abstract class StreamsDatum {
+public class StreamsDatum implements Serializable {
+
+    public StreamsDatum(Object document) {
+        this.document = document;
+    }
+
+    public StreamsDatum(Object document, BigInteger sequenceid) {
+
+        this.document = document;
+        this.sequenceid = sequenceid;
+    }
+
+    public StreamsDatum(Object document, DateTime timestamp) {
+
+        this.document = document;
+        this.timestamp = timestamp;
+    }
+
+    public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) {
+        this.document = document;
+        this.timestamp = timestamp;
+        this.sequenceid = sequenceid;
+    }
 
     public DateTime timestamp;
 
@@ -35,7 +57,7 @@ public abstract class StreamsDatum {
 
     public Map<String, Object> metadata;
 
-    public Serializable document;
+    public Object document;
 
     public DateTime getTimestamp() {
         return timestamp;
@@ -61,11 +83,11 @@ public abstract class StreamsDatum {
         this.metadata = metadata;
     }
 
-    public Serializable getDocument() {
+    public Object getDocument() {
         return document;
     }
 
-    public void setDocument(Serializable document) {
+    public void setDocument(Object document) {
         this.document = document;
     }
 }

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
(original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
Fri Jan 10 22:03:09 2014
@@ -20,7 +20,9 @@ package org.apache.streams.core;
 
 import org.joda.time.DateTime;
 
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Queue;
 
 /**
  * Created by sblackmon on 12/13/13.
@@ -30,6 +32,9 @@ public interface StreamsPersistReader {
     void start();
     void stop();
 
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+    public Queue<StreamsDatum> getPersistQueue();
+
     public StreamsResultSet readAll();
     public StreamsResultSet readNew(BigInteger sequence);
     public StreamsResultSet readRange(DateTime start, DateTime end);

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
(original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
Fri Jan 10 22:03:09 2014
@@ -18,6 +18,9 @@
 
 package org.apache.streams.core;
 
+import java.io.Serializable;
+import java.util.Queue;
+
 /**
  * Created by sblackmon on 12/13/13.
  */
@@ -26,6 +29,9 @@ public interface StreamsPersistWriter {
     void start();
     void stop();
 
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+    public Queue<StreamsDatum> getPersistQueue();
+
     public void write( StreamsDatum entry );
 
 }

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
(original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
Fri Jan 10 22:03:09 2014
@@ -18,6 +18,9 @@
 
 package org.apache.streams.core;
 
+import java.io.Serializable;
+import java.util.Queue;
+
 /**
  * Created by sblackmon on 12/13/13.
  */
@@ -26,6 +29,12 @@ public interface StreamsProcessor {
     void start();
     void stop();
 
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue);
+    public Queue<StreamsDatum> getProcessorInputQueue();
+
+    public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
+    public Queue<StreamsDatum> getProcessorOutputQueue();
+
     public StreamsDatum process( StreamsDatum entry );
 
 }

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1557268&r1=1557267&r2=1557268&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
(original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
Fri Jan 10 22:03:09 2014
@@ -20,7 +20,9 @@ package org.apache.streams.core;
 
 import org.joda.time.DateTime;
 
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Queue;
 
 /**
  * Created by sblackmon on 12/13/13.
@@ -30,8 +32,11 @@ public interface StreamsProvider {
     void start();
     void stop();
 
+    public Queue<StreamsDatum> getProviderQueue();
+
     public StreamsResultSet readCurrent();
     public StreamsResultSet readNew(BigInteger sequence);
     public StreamsResultSet readRange(DateTime start, DateTime end);
 
+
 }



Mime
View raw message