Return-Path: X-Original-To: apmail-streams-commits-archive@minotaur.apache.org Delivered-To: apmail-streams-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC33C10AF1 for ; Mon, 23 Jun 2014 16:17:24 +0000 (UTC) Received: (qmail 1232 invoked by uid 500); 23 Jun 2014 16:17:24 -0000 Delivered-To: apmail-streams-commits-archive@streams.apache.org Received: (qmail 1178 invoked by uid 500); 23 Jun 2014 16:17:24 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 1158 invoked by uid 99); 23 Jun 2014 16:17:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jun 2014 16:17:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 23 Jun 2014 16:17:23 +0000 Received: (qmail 99847 invoked by uid 99); 23 Jun 2014 16:17:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jun 2014 16:17:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A128A887E60; Mon, 23 Jun 2014 16:17:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mfranklin@apache.org To: commits@streams.incubator.apache.org Date: Mon, 23 Jun 2014 16:17:02 -0000 Message-Id: <79c2d0db33e84210bfeed9c2d9b8753f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/10] git commit: STREAMS-71 | Added date parameters X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-streams Updated Branches: refs/heads/master 7d6194d36 -> bce1657d4 STREAMS-71 | Added date parameters Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4202050 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4202050 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4202050 Branch: refs/heads/master Commit: d4202050f8f09e33dd892ceb742b4a5b4b029e54 Parents: ec2ae35 Author: mfranklin Authored: Fri Jun 13 13:01:26 2014 -0400 Committer: mfranklin Committed: Fri Jun 13 13:01:26 2014 -0400 ---------------------------------------------------------------------- .../sysomos/provider/SysomosProvider.java | 39 ++++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4202050/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index 0073ac2..b8b5e56 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -27,6 +27,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.data.util.RFC3339Utils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +52,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class SysomosProvider implements StreamsProvider { + public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); + public static final String ENDING_TIME_KEY = "addedBefore"; + public static final String STARTING_TIME_KEY = "addedAfter"; public static final String STREAMS_ID = "SysomosProvider"; public static final String MODE_KEY = "mode"; public static final String STARTING_DOCS_KEY = "startingDocs"; @@ -75,6 +79,8 @@ public class SysomosProvider implements StreamsProvider { private SysomosConfiguration config; private ScheduledExecutorService stream; private Map documentIds; + private Map addedBefore; + private Map addedAfter; private Mode mode = Mode.CONTINUOUS; private boolean started = false; @@ -118,9 +124,7 @@ public class SysomosProvider implements StreamsProvider { LOGGER.trace("Producer not started. Initializing"); stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1); for (String heartbeatId : getConfig().getHeartbeatIds()) { - Runnable task = documentIds != null && documentIds.containsKey(heartbeatId) ? - new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)) : - new SysomosHeartbeatStream(this, heartbeatId); + Runnable task = createStream(heartbeatId); stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS); LOGGER.info("Started producer task for heartbeat {}", heartbeatId); } @@ -211,10 +215,23 @@ public class SysomosProvider implements StreamsProvider { while (!success); } + protected SysomosHeartbeatStream createStream(String heartbeatId) { + String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null; + String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null; + + if(documentIds != null && documentIds.containsKey(heartbeatId)) { + return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)); + } + if(afterTime != null || beforeTime != null) { + return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime)); + } + return new SysomosHeartbeatStream(this, heartbeatId); + } + /** * Wait for the queue size to be below threshold before allowing execution to continue on this thread */ - private void pauseForSpace() { + protected void pauseForSpace() { while(this.providerQueue.size() >= maxQueued) { LOGGER.trace("Sleeping the current thread due to a full queue"); try { @@ -242,6 +259,20 @@ public class SysomosProvider implements StreamsProvider { } this.documentIds = (Map)configIds; } + if(configMap.containsKey(STARTING_TIME_KEY)) { + Object configIds = configMap.get(STARTING_TIME_KEY); + if(!(configIds instanceof Map)) { + throw new IllegalStateException("Invalid configuration. Added after key must be an instance of Map but was " + configIds); + } + this.addedAfter = (Map)configIds; + } + if(configMap.containsKey(ENDING_TIME_KEY)) { + Object configIds = configMap.get(ENDING_TIME_KEY); + if(!(configIds instanceof Map)) { + throw new IllegalStateException("Invalid configuration. Added before key must be an instance of Map but was " + configIds); + } + this.addedBefore = (Map)configIds; + } } private Queue constructQueue() {