streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfrank...@apache.org
Subject [01/10] git commit: STREAMS-71 | Added date parameters
Date Mon, 23 Jun 2014 16:17:02 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master 7d6194d36 -> bce1657d4


STREAMS-71 | Added date parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4202050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4202050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4202050

Branch: refs/heads/master
Commit: d4202050f8f09e33dd892ceb742b4a5b4b029e54
Parents: ec2ae35
Author: mfranklin <mfranklin@apache.org>
Authored: Fri Jun 13 13:01:26 2014 -0400
Committer: mfranklin <mfranklin@apache.org>
Committed: Fri Jun 13 13:01:26 2014 -0400

----------------------------------------------------------------------
 .../sysomos/provider/SysomosProvider.java       | 39 ++++++++++++++++++--
 1 file changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4202050/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 0073ac2..b8b5e56 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,10 +52,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class SysomosProvider implements StreamsProvider {
 
+
     public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
 
+    public static final String ENDING_TIME_KEY = "addedBefore";
+    public static final String STARTING_TIME_KEY = "addedAfter";
     public static final String STREAMS_ID = "SysomosProvider";
     public static final String MODE_KEY = "mode";
     public static final String STARTING_DOCS_KEY = "startingDocs";
@@ -75,6 +79,8 @@ public class SysomosProvider implements StreamsProvider {
     private SysomosConfiguration config;
     private ScheduledExecutorService stream;
     private Map<String, String> documentIds;
+    private Map<String, String> addedBefore;
+    private Map<String, String> addedAfter;
     private Mode mode = Mode.CONTINUOUS;
     private boolean started = false;
 
@@ -118,9 +124,7 @@ public class SysomosProvider implements StreamsProvider {
             LOGGER.trace("Producer not started.  Initializing");
             stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size()
+ 1);
             for (String heartbeatId : getConfig().getHeartbeatIds()) {
-                Runnable task = documentIds != null && documentIds.containsKey(heartbeatId)
?
-                        new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId))
:
-                        new SysomosHeartbeatStream(this, heartbeatId);
+                Runnable task = createStream(heartbeatId);
                 stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
                 LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
             }
@@ -211,10 +215,23 @@ public class SysomosProvider implements StreamsProvider {
         while (!success);
     }
 
+    protected SysomosHeartbeatStream createStream(String heartbeatId) {
+        String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId)
? addedAfter.get(heartbeatId) : null;
+        String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId)
? addedBefore.get(heartbeatId) : null;
+
+        if(documentIds != null && documentIds.containsKey(heartbeatId)) {
+            return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
+        }
+        if(afterTime != null || beforeTime != null) {
+            return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime),
RFC3339Utils.parseToUTC(afterTime));
+        }
+        return new SysomosHeartbeatStream(this, heartbeatId);
+    }
+
     /**
      * Wait for the queue size to be below threshold before allowing execution to continue
on this thread
      */
-    private void pauseForSpace() {
+    protected void pauseForSpace() {
         while(this.providerQueue.size() >= maxQueued) {
             LOGGER.trace("Sleeping the current thread due to a full queue");
             try {
@@ -242,6 +259,20 @@ public class SysomosProvider implements StreamsProvider {
             }
             this.documentIds = (Map)configIds;
         }
+        if(configMap.containsKey(STARTING_TIME_KEY)) {
+            Object configIds = configMap.get(STARTING_TIME_KEY);
+            if(!(configIds instanceof Map)) {
+                throw new IllegalStateException("Invalid configuration.  Added after key
must be an instance of Map<String,String> but was " + configIds);
+            }
+            this.addedAfter = (Map)configIds;
+        }
+        if(configMap.containsKey(ENDING_TIME_KEY)) {
+            Object configIds = configMap.get(ENDING_TIME_KEY);
+            if(!(configIds instanceof Map)) {
+                throw new IllegalStateException("Invalid configuration.  Added before key
must be an instance of Map<String,String> but was " + configIds);
+            }
+            this.addedBefore = (Map)configIds;
+        }
     }
 
     private Queue<StreamsDatum> constructQueue() {


Mime
View raw message