Return-Path: X-Original-To: apmail-streams-commits-archive@minotaur.apache.org Delivered-To: apmail-streams-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C14F103CA for ; Fri, 21 Mar 2014 23:21:26 +0000 (UTC) Received: (qmail 4564 invoked by uid 500); 21 Mar 2014 23:21:03 -0000 Delivered-To: apmail-streams-commits-archive@streams.apache.org Received: (qmail 4160 invoked by uid 500); 21 Mar 2014 23:20:53 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 3764 invoked by uid 99); 21 Mar 2014 23:20:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Mar 2014 23:20:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 21 Mar 2014 23:20:16 +0000 Received: (qmail 1898 invoked by uid 99); 21 Mar 2014 23:19:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Mar 2014 23:19:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AACD59484A3; Fri, 21 Mar 2014 23:19:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sblackmon@apache.org To: commits@streams.incubator.apache.org Date: Fri, 21 Mar 2014 23:19:44 -0000 Message-Id: In-Reply-To: <40ecbaa02adb432a86296ac778381261@git.apache.org> References: <40ecbaa02adb432a86296ac778381261@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/71] [abbrv] fixing STREAMS-26 branch X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/pom.xml ---------------------------------------------------------------------- diff --git a/trunk/streams-core/pom.xml b/trunk/streams-core/pom.xml deleted file mode 100644 index 8a35b24..0000000 --- a/trunk/streams-core/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - streams-project - org.apache.streams - 0.1-SNAPSHOT - - 4.0.0 - streams-core - jar - - - - joda-time - joda-time - - - org.apache.streams - streams-util - ${project.version} - - - - - src/main/java - src/test/java - - - src/main/resources - - - - - src/test/resources - - - - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java deleted file mode 100644 index 5ad4f7a..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.streams.core; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 1/6/14. - */ -public class StreamHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(StreamHandler.class); - - private volatile StreamState state; - - public void setState(StreamState state) { - this.state = state; - } - - public StreamState getState() { - return this.state; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamState.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamState.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamState.java deleted file mode 100644 index 90b6d23..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamState.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.streams.core; - -/** - * Created by sblackmon on 1/6/14. - */ -public enum StreamState { - RUNNING, //Stream is currently connected and running - STOPPED, // Stream has been shut down and is stopped - CONNECTING, //Stream is attempting to connect to server - SHUTTING_DOWN, //Stream has initialized shutdown - DISCONNECTED //Stream has unintentionally lost connection -} - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java deleted file mode 100644 index f88275b..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import org.joda.time.DateTime; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Map; - -/** - * Created by sblackmon on 1/2/14. - */ -public class StreamsDatum implements Serializable { - - public StreamsDatum(Object document) { - this.document = document; - this.metadata = new HashMap(); - } - - public StreamsDatum(Object document, BigInteger sequenceid) { - - this.document = document; - this.sequenceid = sequenceid; - this.metadata = new HashMap(); - } - - public StreamsDatum(Object document, DateTime timestamp) { - - this.document = document; - this.timestamp = timestamp; - this.metadata = new HashMap(); - } - - public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) { - this.document = document; - this.timestamp = timestamp; - this.sequenceid = sequenceid; - this.metadata = new HashMap(); - } - - public DateTime timestamp; - - public BigInteger sequenceid; - - public Map metadata; - - public Object document; - - public DateTime getTimestamp() { - return timestamp; - } - - public void setTimestamp(DateTime timestamp) { - this.timestamp = timestamp; - } - - public BigInteger getSequenceid() { - return sequenceid; - } - - public void setSequenceid(BigInteger sequenceid) { - this.sequenceid = sequenceid; - } - - public Map getMetadata() { - return metadata; - } - - public void setMetadata(Map metadata) { - this.metadata = metadata; - } - - public Object getDocument() { - return document; - } - - public void setDocument(Object document) { - this.document = document; - } - - @Override - public boolean equals(Object o) { - if(o instanceof StreamsDatum) { - StreamsDatum that = (StreamsDatum) o; - if(this.document != null && this.document.equals(that.document)) { - return (this.timestamp != null ? this.timestamp.equals(that.timestamp) : that.timestamp == null) && - (this.sequenceid != null ? this.sequenceid.equals(that.sequenceid) : that.sequenceid == null); - } - else { - return that.document == null && this.document == null; - } - } - else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java deleted file mode 100644 index 11e9539..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import java.util.List; -import java.util.Queue; - -/** - * Created by sblackmon on 12/13/13. - */ -public interface StreamsFilter { - - void start(); - void stop(); - - public void setProcessorInputQueue(Queue inputQueue); - public Queue getProcessorInputQueue(); - - public void setProcessorOutputQueue(Queue outputQueue); - public Queue getProcessorOutputQueue(); - - public boolean filter(StreamsDatum entry); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java deleted file mode 100644 index 6fb703c..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import org.joda.time.DateTime; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Queue; - -/** - * - * Currently a duplicate interface. Has exact same methods as StreamsProvider. - * Future work should make this interface necessary I'm told. - * - */ -public interface StreamsPersistReader extends StreamsProvider { - -// void start(); -// void stop(); -// -// public void setPersistQueue(Queue persistQueue); -// public Queue getPersistQueue(); - -// public StreamsResultSet readAll(); -// public StreamsResultSet readNew(BigInteger sequence); -// public StreamsResultSet readRange(DateTime start, DateTime end); - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java deleted file mode 100644 index 3f7a79c..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import java.io.Serializable; -import java.util.Queue; - -/** - * Created by sblackmon on 12/13/13. - */ -public interface StreamsPersistWriter extends StreamsOperation{ - - /** - * Persist the StreamsDatum to the corresponding data store. - * @param entry to be stored. - */ - public void write( StreamsDatum entry ); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java deleted file mode 100644 index d61c1e5..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import java.util.List; -import java.util.Queue; - -/** - * Created by sblackmon on 12/13/13. - */ -public interface StreamsProcessor extends StreamsOperation{ - - - - /** - * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will - * passed to every down stream operation that reads from this processor. - * @param entry StreamsDatum to be process - * @return resulting StreamDatums from process. Should never be null or contain null object. Empty list OK. - */ - public List process( StreamsDatum entry ); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java deleted file mode 100644 index 56878a7..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import org.joda.time.DateTime; - -import java.math.BigInteger; -import java.util.Queue; - -/** - * Created by sblackmon on 12/13/13. - */ -public interface StreamsProvider extends StreamsOperation { - -// void start(); -// void stop(); -// -// public Queue getProviderQueue(); - - public StreamsResultSet readCurrent(); - public StreamsResultSet readNew(BigInteger sequence); - public StreamsResultSet readRange(DateTime start, DateTime end); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java ---------------------------------------------------------------------- diff --git a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java b/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java deleted file mode 100644 index 02bd368..0000000 --- a/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import java.math.BigInteger; - -public interface StreamsResultSet extends Iterable { - /** - * Get the time that the result set started collecting - * @return long representing time since epoch - */ - long getStartTime(); - - /** - * Get the time that the result set stopped collecting - * @return long representing time since epoch - */ - long getEndTime(); - - /** - * Get the source Unique identifier - * @return String id - */ - String getSourceId(); - - /** - * Get the maximum id of the items in the result set - * @return the max sequence ID - */ - BigInteger getMaxSequence(); -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/ReadMe.txt ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/ReadMe.txt b/trunk/streams-eip-routes/ReadMe.txt deleted file mode 100644 index 19a1d19..0000000 --- a/trunk/streams-eip-routes/ReadMe.txt +++ /dev/null @@ -1,32 +0,0 @@ -Camel Router WAR Project with Web Console and REST Support -========================================================== - -This project bundles the Camel Web Console, REST API, and some -sample routes as a WAR. You can build the WAR by running - -mvn install - -You can then run the project by dropping the WAR into your -favorite web container or just run - -mvn jetty:run - -to start up and deploy to Jetty. - - -Web Console -=========== - -You can view the Web Console by pointing your browser to http://localhost:8080/ - -You should be able to do things like - - * browse the available endpoints - * browse the messages on an endpoint if it is a BrowsableEndpoint - * send a message to an endpoint - * create new endpoints - -For more help see the Apache Camel documentation - - http://camel.apache.org/ - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/pom.xml ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/pom.xml b/trunk/streams-eip-routes/pom.xml deleted file mode 100644 index 78d0767..0000000 --- a/trunk/streams-eip-routes/pom.xml +++ /dev/null @@ -1,209 +0,0 @@ - - - - - 4.0.0 - - - org.apache.streams - streams-project - 0.1-SNAPSHOT - - - streams-eip-routes - - ${bundle.symbolicName} [${bundle.namespace}] - - - streams-eip-routes - org.apache.streams - 1.9.11 - - - bundle - - - - clojars.org - http://clojars.org/repo - - - - - - - - src/main/resources - - - - . - - plugin.xml - plugin.properties - icons/** - META-INF/* - - - - - - org.ops4j - maven-pax-plugin - - true - - - org.apache.felix - maven-bundle-plugin - 1.4.3 - - - - ${bundle.symbolicName} - ${project.version} - ${bundle.namespace};version="${project.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,org.apache.streams.messaging.service, org.apache.streams.messaging.storm,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}" - ${bundle.namespace}.messaging.routers.impl.*,${bundle.namespace}.messaging.rules.impl.*, ${bundle.namespace}.messaging.service.impl.* - org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.sta te, org.apache.activemq.thread, org.apache.activemq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax, org.apache.rave.model, org.apache.rave.portal.model.impl, backtype.storm, backtype.storm.coordination, backtype.storm.generated, backtype.storm.spout, backtype.storm.task, backtype.storm.topology, backtype.storm.topology.base, backtype.storm.tuple, javax.annotation, backtype.storm.utils - - - - - - - - - org.apache.camel - camel-core - 2.9.0 - - - - org.apache.activemq - activemq-camel - 5.5.1 - - - - org.apache.activemq - activemq-pool - 5.5.1 - - - - org.springframework - spring-core - 3.0.6.RELEASE - - - - org.springframework - spring-context - 3.0.6.RELEASE - - - - org.codehaus.jackson - jackson-mrbean - ${jackson.old.version} - - - - org.codehaus.jackson - jackson-mapper-asl - ${jackson.old.version} - - - - org.osgi - osgi_R4_core - 1.0 - provided - true - - - - org.osgi - osgi_R4_compendium - 1.0 - provided - true - - - - - org.apache.streams.osgi.components - activity-registration - ${project.version} - - - - org.apache.streams.osgi.components - activity-consumer - ${project.version} - - - - org.apache.streams.osgi.components - activity-subscriber - ${project.version} - - - - org.apache.streams - streams-cassandra - ${project.version} - - - - org.apache.rave - rave-core - ${rave.version} - - - - org.apache.rave - rave-core-api - ${rave.version} - - - - storm - storm - 0.8.2 - - - - org.easymock - easymock - 3.1 - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java deleted file mode 100644 index dc7ba0c..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.aggregation; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.messaging.service.impl.CassandraActivityService; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse; -import org.springframework.scheduling.annotation.Scheduled; - -import java.util.*; - -public class ActivityAggregator { - - private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse; - private CassandraActivityService activityService; - private static final transient Log LOG = LogFactory.getLog(ActivityAggregator.class); - - public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) { - this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse; - } - - public void setActivityService(CassandraActivityService activityService) { - this.activityService = activityService; - } - - @Scheduled(fixedRate=30000) - public void distributeToSubscribers() { - for (ActivityStreamsSubscriber subscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) { - updateSubscriber(subscriber); - } - } - - public void updateSubscriber(ActivityStreamsSubscriber subscriber){ - Set activities = new TreeSet(); - activities.addAll(activityService.getActivitiesForFilters(subscriber.getActivityStreamsSubscriberConfiguration().getFilters(), subscriber.getLastUpdated())); - //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost - subscriber.setLastUpdated(new Date()); - subscriber.receive(new ArrayList(activities)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java deleted file mode 100644 index 460c43a..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.configuration; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -public class EipConfigurator { - - - - @Value("${consumer.inRouteHost}") - private String consumerInRouteHost; - - @Value("${consumer.inRoutePort}") - private String consumerInRoutePort; - - @Value("${subscriber.inRouteHost}") - private String subscriberInRouteHost; - - @Value("${subscriber.inRoutePort}") - private String subscriberInRoutePort; - - - @Value("${consumer.activityQUri}") - private String consumerActivityQUri; - - @Value("${consumer.publisherEndpointProtocol}") - private String publisherEndpointProtocol; - - @Value("${consumer.publisherEndpointUrlResource}") - private String publisherEndpointUrlResource; - - @Value("${consumer.receiveMethod}") - private String consumerReceiveMethod; - - @Value("${consumer.splitMethod}") - private String consumerSplitMethod; - - @Value("${subscriber.subscriberEndpointProtocol}") - private String subscriberEndpointProtocol; - - @Value("${subscriber.subscriberEndpointUrlResource}") - private String subscriberEndpointUrlResource; - - @Value("${subscriber.receiveMethod}") - private String subscriberReceiveMethod; - - @Value("${subscriber.postMethod}") - private String subscriberPostMethod; - - @Value("${subscriber.getMethod}") - private String subscriberGetMethod; - - - @Value("${servlet.baseUrlPath}") - private String baseUrlPath; - - - public static String ENDPOINT_PROTOCOL_JETTY="jetty:http://"; - public static String ENDPOINT_PROTOCOL_SERVLET="servlet:///"; - - public String getConsumerInRouteHost() { - return consumerInRouteHost; - } - - public String getConsumerInRoutePort() { - return consumerInRoutePort; - } - - public String getConsumerActivityQUri() { - return consumerActivityQUri; - } - - public void setConsumerActivityQUri(String consumerActivityQUri) { - this.consumerActivityQUri = consumerActivityQUri; - } - - public void setConsumerInRoutePort(String consumerInRoutePort) { - this.consumerInRoutePort = consumerInRoutePort; - } - - public void setConsumerInRouteHost(String consumerInRouteHost) { - this.consumerInRouteHost = consumerInRouteHost; - } - - public String getSubscriberInRouteHost() { - return subscriberInRouteHost; - } - - public void setSubscriberInRouteHost(String subscriberInRouteHost) { - this.subscriberInRouteHost = subscriberInRouteHost; - } - - public String getSubscriberInRoutePort() { - return subscriberInRoutePort; - } - - public void setSubscriberInRoutePort(String subscriberInRoutePort) { - this.subscriberInRoutePort = subscriberInRoutePort; - } - - public String getPublisherEndpointProtocol() { - return publisherEndpointProtocol; - } - - public void setPublisherEndpointProtocol(String publisherEndpointProtocol) { - this.publisherEndpointProtocol = publisherEndpointProtocol; - } - - public String getPublisherEndpointUrlResource() { - return publisherEndpointUrlResource; - } - - public void setPublisherEndpointUrlResource(String publisherEndpointUrlResource) { - this.publisherEndpointUrlResource = publisherEndpointUrlResource; - } - - public String getConsumerReceiveMethod() { - return consumerReceiveMethod; - } - - public void setConsumerReceiveMethod(String consumerReceiveMethod) { - this.consumerReceiveMethod = consumerReceiveMethod; - } - - public String getConsumerSplitMethod() { - return consumerSplitMethod; - } - - public void setConsumerSplitMethod(String consumerSplitMethod) { - this.consumerSplitMethod = consumerSplitMethod; - } - - public String getSubscriberEndpointProtocol() { - return subscriberEndpointProtocol; - } - - public void setSubscriberEndpointProtocol(String subscriberEndpointProtocol) { - this.subscriberEndpointProtocol = subscriberEndpointProtocol; - } - - public String getSubscriberEndpointUrlResource() { - return subscriberEndpointUrlResource; - } - - public void setSubscriberEndpointUrlResource(String subscriberEndpointUrlResource) { - this.subscriberEndpointUrlResource = subscriberEndpointUrlResource; - } - - public String getSubscriberReceiveMethod() { - return subscriberReceiveMethod; - } - - public void setSubscriberReceiveMethod(String subscriberReceiveMethod) { - this.subscriberReceiveMethod = subscriberReceiveMethod; - } - - public String getSubscriberPostMethod() { - return subscriberPostMethod; - } - - public void setSubscriberPostMethod(String subscriberPostMethod) { - this.subscriberPostMethod = subscriberPostMethod; - } - - public String getSubscriberGetMethod() { - return subscriberGetMethod; - } - - public void setSubscriberGetMethod(String subscriberGetMethod) { - this.subscriberGetMethod = subscriberGetMethod; - } - - public String getBaseUrlPath() { - return baseUrlPath; - } - - public void setBaseUrlPath(String baseUrlPath) { - this.baseUrlPath = baseUrlPath; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java deleted file mode 100644 index 0c60349..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.processors; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; -import org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; - - -public class ActivityPublisherRegistrationProcessor implements Processor{ - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class); - public void process(Exchange exchange){ - //add the necessary headers to the message so that the activity registration component - //can do a lookup to either make a new processor and endpoint, or pass the message to the right one - String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString(); - - if (!httpMethod.equals("POST")){ - //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405); - } else { - - //for now...just expect a post with a uri in the body...should have some checking here with http response codes - // authentication, all that good stuff...happens in the registration module - - String body = exchange.getIn().getBody(String.class); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false); - - try { - - // read from file, convert it to user class - ActivityConsumer configuration = mapper.readValue(body, ActivityConsumer.class); - if (configuration.getSrc()==null){ - LOG.info("configuration src is null"); - throw new Exception(); - } - - exchange.getOut().setBody(configuration); - - } catch (Exception e) { - LOG.info("error: " + e); - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400); - exchange.getOut().setBody("POST should contain a valid JSON configuration for registering as an Activity Publisher (check that src element is valid)."); - } - } - - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java deleted file mode 100644 index 9306aa3..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.processors; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.messaging.service.SubscriptionService; -import org.apache.streams.messaging.service.impl.CassandraSubscriptionService; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; - - -public class ActivityStreamsSubscriberRegistrationProcessor implements Processor{ - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class); - private SubscriptionService subscriptionService; - - public ActivityStreamsSubscriberRegistrationProcessor(SubscriptionService subscriptionService){ - this.subscriptionService = subscriptionService; - } - - public void process(Exchange exchange){ - LOG.info("processing the subscriber..."); - //add the necessary headers to the message so that the activity registration component - //can do a lookup to either make a new processor and endpoint, or pass the message to the right one - String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString(); - - if (!httpMethod.equals("POST")){ - //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405); - } else { - - //for now...just expect a post with a uri in the body...should have some checking here with http response codes - // authentication, all that good stuff...happens in the registration module - - - String body = exchange.getIn().getBody(String.class); - - LOG.info("receiving the subscriber: "+body); - //OAuth token? What does subscriber post to init a subscription URL? - //maybe its a list of URLs to subscribe to subscriptions=1,2,3,4&auth_token=XXXX - - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false); - - try { - - // read from file, convert it to user class - ActivityStreamsSubscription configuration = mapper.readValue(body, ActivityStreamsSubscription.class); - if(configuration.getFilters() == null){ - configuration.setFilters(subscriptionService.getFilters(configuration.getAuthToken())); - }else{ - subscriptionService.saveFilters(configuration); - } - exchange.getOut().setBody(configuration); - - } catch (Exception e) { - LOG.info("exception" + e); - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400); - exchange.getOut().setBody("POST should contain a valid Subscription configuration object."); - } - - - - //just pass this on to the route creator, body will be the dedicated URL for this subscriber - - } - - - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java deleted file mode 100644 index dea8781..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers; - - - -import org.apache.camel.Exchange; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; - - -public interface ActivityConsumerRouteBuilder { - - - void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java deleted file mode 100644 index 6947722..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers; - - - -import org.apache.camel.Exchange; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; - - -public interface ActivityStreamsSubscriberRouteBuilder { - - - void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java deleted file mode 100644 index 20b8246..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers.impl; - - -import org.apache.camel.builder.RouteBuilder; -import org.apache.streams.messaging.routers.ActivityConsumerRouteBuilder; - - -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumerWarehouse; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; -import org.apache.streams.messaging.configuration.EipConfigurator; -import org.springframework.beans.factory.annotation.Autowired; -import org.apache.camel.Exchange; -import org.apache.camel.CamelContext; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.UUID; - - -public class ActivityConsumerRouter extends RouteBuilder implements ActivityConsumerRouteBuilder { - private static final transient Log LOG = LogFactory.getLog(ActivityConsumerRouter.class); - - @Autowired - private EipConfigurator configuration; - - protected CamelContext camelContext; - - private ActivityConsumerWarehouse activityConsumerWarehouse; - - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public void setActivityConsumerWarehouse(ActivityConsumerWarehouse activityConsumerWarehouse) { - this.activityConsumerWarehouse = activityConsumerWarehouse; - } - - - public void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer){ - - //todo: add some better scheme then getCount for URL... - //todo: make the route again if consumer exists...and context doesn't have route - if (activityConsumer.isAuthenticated()){ - ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc().toASCIIString()); - - if (existingConsumer==null){ - - try{ - - if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){ - activityConsumer.setInRoute(configuration.getConsumerInRouteHost()+ ":" + configuration.getConsumerInRoutePort() +"/" + configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody("http://" + activityConsumer.getInRoute()); - }else if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){ - activityConsumer.setInRoute( configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody(configuration.getBaseUrlPath() + activityConsumer.getInRoute()); - } else{ - throw new Exception("No supported endpoint protocol is configured."); - } - - - //setup a message queue for this consumer.getInRoute() - camelContext.addRoutes(new DynamicConsumerRouteBuilder(configuration,camelContext, configuration.getPublisherEndpointProtocol() + activityConsumer.getInRoute(), activityConsumer)); - - - LOG.info("all messages sent from " + activityConsumer.getSrc() + " must be posted to " + activityConsumer.getInRoute()); - //only add the route to the warehouse after its been created in messaging system... - activityConsumerWarehouse.register(activityConsumer); - }catch (Exception e){ - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500); - exchange.getOut().setBody("error creating route: " + e); - LOG.error("error creating route: " + e); - } - - } else{ - - exchange.getOut().setBody(configuration.getBaseUrlPath() + existingConsumer.getInRoute()); - } - - }else{ - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401); - exchange.getOut().setBody("Authentication failed."); - } - - } - - - public void configure() throws java.lang.Exception{ - //nothing...set the context? - - } - - /** - * This route builder is a skeleton to add new routes at runtime - */ - private static final class DynamicConsumerRouteBuilder extends RouteBuilder { - private final String from; - private ActivityConsumer activityConsumer; - - - private EipConfigurator configuration; - - private DynamicConsumerRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityConsumer activityConsumer) { - super(context); - this.from = from; - this.activityConsumer = activityConsumer; - this.configuration = configuration; - } - - @Override - public void configure() throws Exception { - - - from(from) - .bean(activityConsumer, configuration.getConsumerReceiveMethod()).setBody(body()) - .split() - .method(activityConsumer, configuration.getConsumerSplitMethod()) - .to(configuration.getConsumerActivityQUri()); - - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java deleted file mode 100644 index ad4b779..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers.impl; - - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.builder.RouteBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.messaging.aggregation.ActivityAggregator; -import org.apache.streams.messaging.configuration.EipConfigurator; -import org.apache.streams.messaging.routers.ActivityStreamsSubscriberRouteBuilder; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.HashMap; -import java.util.UUID; - - -public class ActivityStreamsSubscriberRouter extends RouteBuilder implements ActivityStreamsSubscriberRouteBuilder { - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRouter.class); - - @Autowired - private EipConfigurator configuration; - - protected CamelContext camelContext; - - @Autowired - private ActivityAggregator activityAggregator; - - private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse; - - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) { - this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse; - } - - - public void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber){ - - //todo: add some better scheme then getCount for URL... - //todo: make the route again if subscriber exists...and context doesn't have route - if (activityStreamsSubscriber.isAuthenticated()){ - - try{ - - if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){ - activityStreamsSubscriber.setInRoute(configuration.getSubscriberInRouteHost()+ ":" + configuration.getSubscriberInRoutePort() +"/" + configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody("http://" + activityStreamsSubscriber.getInRoute()); - }else if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){ - activityStreamsSubscriber.setInRoute( configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody(configuration.getBaseUrlPath() + activityStreamsSubscriber.getInRoute()); - } else{ - throw new Exception("No supported endpoint protocol is configured."); - } - - //setup a message queue for this consumer.getInRoute() - camelContext.addRoutes(new DynamicSubscriberRouteBuilder(configuration,camelContext, configuration.getSubscriberEndpointProtocol() + activityStreamsSubscriber.getInRoute(), activityStreamsSubscriber)); - - activityAggregator.updateSubscriber(activityStreamsSubscriber); - activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber); - }catch (Exception e){ - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500); - exchange.getOut().setBody("error creating route: " + e); - LOG.error("error creating route: " + e); - } - - }else{ - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401); - exchange.getOut().setBody("Authentication failed."); - } - - } - - - - - public void configure() throws Exception{ - //nothing...set the context? - - } - - /** - * This route builder is a skeleton to add new routes at runtime - */ - private static final class DynamicSubscriberRouteBuilder extends RouteBuilder { - private final String from; - private ActivityStreamsSubscriber activityStreamsSubscriber; - - - private EipConfigurator configuration; - - private DynamicSubscriberRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityStreamsSubscriber activityStreamsSubscriber) { - super(context); - this.from = from; - this.activityStreamsSubscriber = activityStreamsSubscriber; - this.configuration = configuration; - } - - @Override - public void configure() throws Exception { - - - from(from) - .choice() - .when(header("CamelHttpMethod").isEqualTo("POST")) - //when its a post...it goes to adding a new src - .bean(activityStreamsSubscriber, configuration.getSubscriberPostMethod()).setBody(body()) - .when(header("CamelHttpMethod").isEqualTo("GET")) - // when its a GET it goes to getStream() - .bean(activityStreamsSubscriber, configuration.getSubscriberGetMethod()) ; - - - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java deleted file mode 100644 index 0c85974..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service; - -import org.apache.camel.Exchange; - -import java.util.Date; -import java.util.List; - -public interface ActivityService { - - void receiveExchange(Exchange exchange); - - List getActivitiesForFilters(List filters, Date lastUpdated); -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java deleted file mode 100644 index 98f585d..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service; - -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; - -import java.util.List; - -public interface SubscriptionService { - - List getFilters(String authToken); - void saveFilters(ActivityStreamsSubscription subscription); -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java deleted file mode 100644 index 89f71ab..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service.impl; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.camel.Exchange; -import org.apache.rave.model.ActivityStreamsEntry; -import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry; -import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository; -import org.apache.streams.messaging.service.ActivityService; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -public class CassandraActivityService implements ActivityService { - - private static final transient Log LOG = LogFactory.getLog(CassandraActivityService.class); - - private CassandraActivityStreamsRepository cassandraActivityStreamsRepository; - private ObjectMapper mapper; - - @Autowired - public CassandraActivityService(CassandraActivityStreamsRepository cassandraActivityStreamsRepository, ObjectMapper mapper) { - this.cassandraActivityStreamsRepository = cassandraActivityStreamsRepository; - this.mapper = mapper; - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - @Override - public void receiveExchange(Exchange exchange) { - - //receive the exchange as a list - List grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - - for (Exchange e : grouped) { - //get activity off of exchange - LOG.info("Exchange: " + e); - - //extract the ActivityStreamsEntry object and save it in the database - LOG.info("About to preform the translation to JSON Object"); - String activityJson = e.getIn().getBody(String.class); - - try { - ActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class); - streamsEntry.setPublished(new Date()); - cassandraActivityStreamsRepository.save(streamsEntry); - } catch (IOException err) { - LOG.error("there was an error while converting the json string to an object and saving to the database", err); - } - - } - } - - @Override - public List getActivitiesForFilters(List filters, Date lastUpdated) { - List activityObjects = cassandraActivityStreamsRepository.getActivitiesForFilters(filters, lastUpdated); - Collections.sort(activityObjects, Collections.reverseOrder()); - //TODO: make the number of streams returned configurable - return getJsonList(activityObjects.subList(0,Math.min(activityObjects.size(),10))); - } - - private List getJsonList(List activities) { - List jsonList = new ArrayList(); - for (ActivityStreamsEntry entry : activities) { - try { - jsonList.add(mapper.writeValueAsString(entry)); - } catch (IOException e) { - LOG.error("There was an error while trying to convert the java object to a string: " + entry, e); - } - } - return jsonList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java b/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java deleted file mode 100644 index 8972d1e..0000000 --- a/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service.impl; - -import org.apache.streams.cassandra.repository.impl.CassandraSubscriptionRepository; -import org.apache.streams.messaging.service.SubscriptionService; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; - -import java.util.Arrays; -import java.util.List; - -public class CassandraSubscriptionService implements SubscriptionService { - - private CassandraSubscriptionRepository repository; - - public CassandraSubscriptionService(CassandraSubscriptionRepository repository){ - this.repository = repository; - } - - public List getFilters(String authToken){ - return Arrays.asList(repository.getFilters(authToken).split(" ")); - } - - public void saveFilters(ActivityStreamsSubscription subscription){ - repository.save(subscription); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/resources/META-INF/spring/propertiesLoader.xml ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/propertiesLoader.xml b/trunk/streams-eip-routes/src/main/resources/META-INF/spring/propertiesLoader.xml deleted file mode 100644 index 60a3f1f..0000000 --- a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/propertiesLoader.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - - - - - - - - - - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml b/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml deleted file mode 100644 index a9b97a7..0000000 --- a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml +++ /dev/null @@ -1,113 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml ---------------------------------------------------------------------- diff --git a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml b/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml deleted file mode 100644 index 9066206..0000000 --- a/trunk/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - - - - - - - - - - - - - - \ No newline at end of file