storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [02/16] storm git commit: STORM-2416: break out storm-jms-examples
Date Mon, 20 Mar 2017 22:54:59 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
deleted file mode 100644
index 55e29bc..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.ITridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.Config;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.Utils;
-
-/**
- * Trident implementation of the JmsSpout
- * <p>
- *
- */
-public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
-
-    public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
-    
-    public static final int DEFAULT_BATCH_SIZE = 1000;
-
-    private static final long serialVersionUID = -3469351154693356655L;
-    
-    private JmsTupleProducer tupleProducer;
-
-    private JmsProvider jmsProvider;
-
-    private int jmsAcknowledgeMode;
-
-    private String name;
-
-    private static int nameIndex = 1;
-    
-    /**
-     * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE
-     */
-    public TridentJmsSpout() {
-        this.name = "JmsSpout_"+(nameIndex++);
-        this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-    }
-    
-    /**
-     * Set the name for this spout, to improve log identification
-     * @param name The name to be used in log messages
-     * @return This spout
-     */
-    public TridentJmsSpout named(String name) {
-        this.name = name;
-        return this;
-    }
-    
-    /**
-     * Set the <code>JmsProvider</code>
-     * implementation that this Spout will use to connect to 
-     * a JMS <code>javax.jms.Desination</code>
-     * 
-     * @param provider
-     */
-    public TridentJmsSpout withJmsProvider(JmsProvider provider){
-        this.jmsProvider = provider;
-        return this;
-    }
-    
-    /**
-     * Set the <code>JmsTupleProducer</code>
-     * implementation that will convert <code>javax.jms.Message</code>
-     * object to <code>backtype.storm.tuple.Values</code> objects
-     * to be emitted.
-     * 
-     * @param tupleProducer
-     * @return This spout
-     */
-    public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) {
-        this.tupleProducer = tupleProducer;
-        return this;
-    }
-    
-    /**
-     * Set the JMS acknowledge mode for messages being processed by this spout.
-     * <p/>
-     * Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     * @param jmsAcknowledgeMode The chosen acknowledge mode
-     * @return This spout
-     * @throws IllegalArgumentException if the mode is not recognized
-     */
-    public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
-        toDeliveryModeString(jmsAcknowledgeMode);
-        this.jmsAcknowledgeMode = jmsAcknowledgeMode;
-        return this;
-    }
-    
-    /**
-     * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if
-     * the mode is not recognized.
-     * <p/>
-     * Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     * @param acknowledgeMode A valid JMS acknowledge mode
-     * @return A friendly string describing the acknowledge mode
-     * @throws IllegalArgumentException if the mode is not recognized
-     */
-    private static final String toDeliveryModeString(int acknowledgeMode) {
-        switch (acknowledgeMode) {
-        case Session.AUTO_ACKNOWLEDGE:
-            return "AUTO_ACKNOWLEDGE";
-        case Session.CLIENT_ACKNOWLEDGE:
-            return "CLIENT_ACKNOWLEDGE";
-        case Session.DUPS_OK_ACKNOWLEDGE:
-            return "DUPS_OK_ACKNOWLEDGE";
-        default:
-            throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)");
-        }
-    }
-    
-    @Override
-    public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(
-            String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
-        return new JmsBatchCoordinator(name);
-    }
-
-    @Override
-    public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
-        return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        OutputFieldsGetter fieldGetter = new OutputFieldsGetter();
-        tupleProducer.declareOutputFields(fieldGetter);
-        StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID);
-        if (streamInfo == null) {
-            throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream");
-        }
-        
-        return new Fields(streamInfo.get_output_fields());
-    }
-    
-    /**
-     * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit,
-     * the queued messages are emitted as a batch.
-     *
-     */
-    private class JmsEmitter implements Emitter<JmsBatch>, MessageListener {
-
-        private final LinkedBlockingQueue<Message> queue;
-        private final Connection connection;
-        private final Session session;
-
-        private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids.
-        
-        private final long rotateTimeMillis;
-        private final int maxBatchSize;
-        private final String name;
-        
-        private long lastRotate;
-       
-        private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
- 
-        public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) {
-            if (jmsProvider == null) {
-                throw new IllegalStateException("JMS provider has not been set.");
-            }
-            if (tupleProducer == null) {
-                throw new IllegalStateException("JMS Tuple Producer has not been set.");
-            }
-
-            this.queue = new LinkedBlockingQueue<Message>();
-            this.name = name;
-            
-            batchMessageMap = new RotatingMap<Long, List<Message>>(3);
-            rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
-            lastRotate = System.currentTimeMillis();
-            
-            Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
-            maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE;
-
-            try {
-                ConnectionFactory cf = jmsProvider.connectionFactory();
-                Destination dest = jmsProvider.destination();
-                this.connection = cf.createConnection();
-                this.session = connection.createSession(false, jmsAcknowledgeMode);
-                MessageConsumer consumer = session.createConsumer(dest);
-                consumer.setMessageListener(this);
-                this.connection.start();
-
-                LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name);
-
-            } catch (Exception e) {
-                LOG.warn("Error creating JMS connection.", e);
-                throw new IllegalStateException("Could not create JMS connection for spout ", e);
-            }
-            
-        }
-        
-        @Override
-        public void success(TransactionAttempt tx) {
-            
-            @SuppressWarnings("unchecked")
-            List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId());
-            
-            if (messages != null) {
-                if (!messages.isEmpty()) {
-                    LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
-                }
-                
-                for (Message msg: messages) {
-                    String messageId = "UnknownId";
-                    
-                    try {
-                        messageId = msg.getJMSMessageID();
-                        msg.acknowledge();
-                        LOG.trace("Acknowledged message "+messageId);
-                    } catch (JMSException e) {
-                        LOG.warn("Failed to acknowledge message "+messageId, e);
-                    }
-                }
-            }
-            else {
-                LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId());
-            }
-        }
-
-        /**
-         * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a 
-         * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards 
-         * messages that have been failed.
-         * @param transactionId The transaction id of the failed batch
-         * @param messages The list of messages to fail.
-         */
-        private void fail(Long transactionId, List<Message> messages) {
-            LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name);
-            if (messages != null) {
-                for (Message msg: messages) {
-                    try {
-                        LOG.trace("Failed message "+msg.getJMSMessageID());
-                    } catch (JMSException e) {
-                        LOG.warn("Could not identify failed message ", e);
-                    }
-                }
-            }
-            else {
-                LOG.warn("Failed batch has no messages with transaction id "+transactionId);
-            }            
-        }
-
-        @Override
-        public void close() {
-            try {
-                LOG.info("Closing JMS connection.");
-                this.session.close();
-                this.connection.close();
-            } catch (JMSException e) {
-                LOG.warn("Error closing JMS connection.", e);
-            }   
-        }
-
-        @Override
-        public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta,
-                TridentCollector collector) {
-            
-            long now = System.currentTimeMillis();
-            if(now - lastRotate > rotateTimeMillis) {
-                Map<Long, List<Message>> failed = batchMessageMap.rotate();
-                for(Long id: failed.keySet()) {
-                    LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name);
-                    fail(id, failed.get(id));
-                }
-                lastRotate = now;
-            }
-            
-            if(batchMessageMap.containsKey(tx.getTransactionId())) {
-                LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
-                fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId()));
-            }
-            
-            List<Message> batchMessages = new ArrayList<Message>();
-            
-            for (int index=0; index<maxBatchSize; index++) {
-                Message msg = queue.poll();
-                if (msg == null) {
-                    Utils.sleep(50); // Back off
-                    break;
-                }
-                
-                try {
-                    if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) {
-                        batchMessages.add(msg);
-                    }
-                    Values tuple = tupleProducer.toTuple(msg);
-                    collector.emit(tuple);
-                } catch (JMSException e) {
-                    LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e );
-                }
-            }
-            
-            if (!batchMessages.isEmpty()) {
-                LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name);
-            }
-            else {
-                LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
-            }
-            batchMessageMap.put(tx.getTransactionId(), batchMessages);
-        }
-
-        @Override
-        public void onMessage(Message msg) {
-            try {
-                LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]");
-            } catch (JMSException e) {
-                // Nothing here, could not get message id
-            }
-            this.queue.offer(msg);
-        }
-        
-    }
-    
-    /**
-     * Bare implementation of a BatchCoordinator, returning a null JmsBatch object
-     *
-     */
-    private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> {
-
-        private final String name;
-        
-        private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class);
-
-        public JmsBatchCoordinator(String name) {
-            this.name = name;
-            LOG.info("Created batch coordinator for "+name);
-        }
-        
-        @Override
-        public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) {
-            LOG.debug("Initialise transaction "+txid+" for "+name);
-            return null;
-        }
-
-        @Override
-        public void success(long txid) {
-        }
-
-        @Override
-        public boolean isReady(long txid) {
-            return true;
-        }
-
-        @Override
-        public void close() {
-        }
-        
-    }
-
-}
-
-    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
deleted file mode 100644
index e80f70a..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsSpoutTest {
-    private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class);
-
-    @Test
-    public void testFailure() throws JMSException, Exception{
-        JmsSpout spout = new JmsSpout();
-        JmsProvider mockProvider = new MockJmsProvider();
-        MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
-        spout.setJmsProvider(new MockJmsProvider());
-        spout.setJmsTupleProducer(new MockTupleProducer());
-        spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        spout.setRecoveryPeriod(10); // Rapid recovery for testing.
-        spout.open(new HashMap<String,String>(), null, collector);
-        Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination());
-        Thread.sleep(100);
-        spout.nextTuple(); // Pretend to be storm.
-        Assert.assertTrue(mockCollector.emitted);
-        
-        mockCollector.reset();        
-        spout.fail(msg.getJMSMessageID()); // Mock failure
-        Thread.sleep(5000);
-        spout.nextTuple(); // Pretend to be storm.
-        Thread.sleep(5000);
-        Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted
-    }
-
-    @Test
-    public void testSerializability() throws IOException{
-        JmsSpout spout = new JmsSpout();
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(out);
-        oos.writeObject(spout);
-        oos.close();
-        Assert.assertTrue(out.toByteArray().length > 0);
-    }
-    
-    public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException {        
-        Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        MessageProducer producer = mySess.createProducer(destination);
-        TextMessage msg = mySess.createTextMessage();
-        msg.setText("Hello World");
-        LOG.info("Sending Message: {}", msg.getText());
-        producer.send(msg);
-        return msg;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
deleted file mode 100644
index 3ba0853..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-
-import org.apache.storm.jms.JmsProvider;
-
-public class MockJmsProvider implements JmsProvider {
-    private static final long serialVersionUID = 1L;
-
-    private ConnectionFactory connectionFactory = null;
-    private Destination destination = null;
-    
-    public MockJmsProvider() throws NamingException{
-        this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
-        Context jndiContext = new InitialContext();
-        this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR");        
-
-    }
-    
-    /**
-     * Provides the JMS <code>ConnectionFactory</code>
-     * @return the connection factory
-     * @throws Exception
-     */
-    public ConnectionFactory connectionFactory() throws Exception{
-        return this.connectionFactory;
-    }
-
-    /**
-     * Provides the <code>Destination</code> (topic or queue) from which the
-     * <code>JmsSpout</code> will receive messages.
-     * @return
-     * @throws Exception
-     */
-    public Destination destination() throws Exception{
-        return this.destination;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
deleted file mode 100644
index a5a6c51..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-    boolean emitted = false;
-
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-        return new ArrayList<Integer>();
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-    }
-
-    @Override
-    public void reportError(Throwable error) {
-    }
-
-    public boolean emitted(){
-        return this.emitted;
-    }
-
-    public void reset(){
-        this.emitted = false;
-    }
-
-    @Override
-    public long getPendingCount() {
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
deleted file mode 100644
index ea571fc..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class MockTupleProducer implements JmsTupleProducer {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public Values toTuple(Message msg) throws JMSException {
-        if (msg instanceof TextMessage) {
-            String json = ((TextMessage) msg).getText();
-            return new Values(json);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("json"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/resources/jndi.properties
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/resources/jndi.properties b/external/storm-jms/core/src/test/resources/jndi.properties
deleted file mode 100644
index af19521..0000000
--- a/external/storm-jms/core/src/test/resources/jndi.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
-java.naming.provider.url = vm://localhost?broker.persistent=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/README.markdown
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/README.markdown b/external/storm-jms/examples/README.markdown
deleted file mode 100644
index 7a4d8f0..0000000
--- a/external/storm-jms/examples/README.markdown
+++ /dev/null
@@ -1,12 +0,0 @@
-## About Storm JMS Examples
-This project contains a simple storm topology that illustrates the usage of "storm-jms".
-
-To build:
-
-`mvn clean install`
-
-The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory:
-
-`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar`
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml
deleted file mode 100644
index 1809e34..0000000
--- a/external/storm-jms/examples/pom.xml
+++ /dev/null
@@ -1,151 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-jms-parent</artifactId>
-        <version>1.1.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-
-    <artifactId>storm-jms-examples</artifactId>
-
-    <properties>
-        <spring.version>2.5.6</spring.version>
-    </properties>
-    <dependencies>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-beans</artifactId>
-            <version>${spring.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-core</artifactId>
-            <version>${spring.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-context</artifactId>
-            <version>${spring.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-jms</artifactId>
-            <version>${spring.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.xbean</groupId>
-            <artifactId>xbean-spring</artifactId>
-            <version>3.7</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <!-- keep storm out of the jar-with-dependencies -->
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-jms</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-core</artifactId>
-            <version>5.4.0</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <!-- bind the maven-assembly-plugin to the package phase this will create
-                a jar file without the storm dependencies suitable for deployment to a cluster. -->
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <archive>
-                        <manifest>
-                            <mainClass></mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-
-            </plugin>
-
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>exec-maven-plugin</artifactId>
-                <version>1.2.1</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>exec</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <executable>java</executable>
-                    <includeProjectDependencies>true</includeProjectDependencies>
-                    <includePluginDependencies>true</includePluginDependencies>
-                    <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass>
-                    <systemProperties>
-                        <systemProperty>
-                            <key>log4j.configuration</key>
-                            <value>file:./src/main/resources/log4j.properties</value>
-                        </systemProperty>
-                    </systemProperties>
-                </configuration>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.storm</groupId>
-                        <artifactId>storm-core</artifactId>
-                        <version>${project.version}</version>
-                        <type>jar</type>
-                    </dependency>
-                </dependencies>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
deleted file mode 100644
index 3324aac..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.jms.bolt.JmsBolt;
-import org.apache.storm.jms.spout.JmsSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
-
-public class ExampleJmsTopology {
-    public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
-    public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT";
-    public static final String FINAL_BOLT = "FINAL_BOLT";
-    public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT";
-    public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT";
-    public static final String ANOTHER_BOLT = "ANOTHER_BOLT";
-
-    @SuppressWarnings("serial")
-    public static void main(String[] args) throws Exception {
-
-        // JMS Queue Provider
-        JmsProvider jmsQueueProvider = new SpringJmsProvider(
-                "jms-activemq.xml", "jmsConnectionFactory",
-                "notificationQueue");
-
-        // JMS Topic provider
-        JmsProvider jmsTopicProvider = new SpringJmsProvider(
-                "jms-activemq.xml", "jmsConnectionFactory",
-                "notificationTopic");
-
-        // JMS Producer
-        JmsTupleProducer producer = new JsonTupleProducer();
-
-        // JMS Queue Spout
-        JmsSpout queueSpout = new JmsSpout();
-        queueSpout.setJmsProvider(jmsQueueProvider);
-        queueSpout.setJmsTupleProducer(producer);
-        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        queueSpout.setDistributed(true); // allow multiple instances
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        // spout with 5 parallel instances
-        builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5);
-
-        // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks
-        builder.setBolt(INTERMEDIATE_BOLT,
-                new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping(
-                JMS_QUEUE_SPOUT);
-
-        // bolt that subscribes to the intermediate bolt, and auto-acks
-        // messages.
-        builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping(
-                INTERMEDIATE_BOLT);
-
-        // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic
-        JmsBolt jmsBolt = new JmsBolt();
-        jmsBolt.setJmsProvider(jmsTopicProvider);
-
-        // anonymous message producer just calls toString() on the tuple to create a jms message
-        jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
-            @Override
-            public Message toMessage(Session session, ITuple input) throws JMSException {
-                System.out.println("Sending JMS Message:" + input.toString());
-                TextMessage tm = session.createTextMessage(input.toString());
-                return tm;
-            }
-        });
-
-        builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT);
-
-        // JMS Topic spout
-        JmsSpout topicSpout = new JmsSpout();
-        topicSpout.setJmsProvider(jmsTopicProvider);
-        topicSpout.setJmsTupleProducer(producer);
-        topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        topicSpout.setDistributed(false);
-
-        builder.setSpout(JMS_TOPIC_SPOUT, topicSpout);
-
-        builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping(
-                JMS_TOPIC_SPOUT);
-
-        Config conf = new Config();
-
-        if (args.length > 0) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopology(args[0], conf,
-                    builder.createTopology());
-        } else {
-
-            conf.setDebug(true);
-
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
-            Utils.sleep(60000);
-            cluster.killTopology("storm-jms-example");
-            cluster.shutdown();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
deleted file mode 100644
index 57de1ba..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import java.util.Map;
-
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A generic <code>org.apache.storm.topology.IRichBolt</code> implementation
- * for testing/debugging the Storm JMS Spout and example topologies.
- * <p>
- * For debugging purposes, set the log level of the
- * <code>org.apache.storm.contrib.jms</code> package to DEBUG for debugging
- * output.
- *
- * @author tgoetz
- */
-@SuppressWarnings("serial")
-public class GenericBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class);
-    private OutputCollector collector;
-    private boolean autoAck = false;
-    private boolean autoAnchor = false;
-    private Fields declaredFields;
-    private String name;
-
-    /**
-     * Constructs a new <code>GenericBolt</code> instance.
-     *
-     * @param name           The name of the bolt (used in DEBUG logging)
-     * @param autoAck        Whether or not this bolt should automatically acknowledge received tuples.
-     * @param autoAnchor     Whether or not this bolt should automatically anchor to received tuples.
-     * @param declaredFields The fields this bolt declares as output.
-     */
-    public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields) {
-        this.name = name;
-        this.autoAck = autoAck;
-        this.autoAnchor = autoAnchor;
-        this.declaredFields = declaredFields;
-    }
-
-    public GenericBolt(String name, boolean autoAck, boolean autoAnchor) {
-        this(name, autoAck, autoAnchor, null);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public void prepare(Map stormConf, TopologyContext context,
-                        OutputCollector collector) {
-        this.collector = collector;
-
-    }
-
-    public void execute(Tuple input) {
-        LOG.debug("[" + this.name + "] Received message: " + input);
-
-
-        // only emit if we have declared fields.
-        if (this.declaredFields != null) {
-            LOG.debug("[" + this.name + "] emitting: " + input);
-            if (this.autoAnchor) {
-                this.collector.emit(input, input.getValues());
-            } else {
-                this.collector.emit(input.getValues());
-            }
-        }
-
-        if (this.autoAck) {
-            LOG.debug("[" + this.name + "] ACKing tuple: " + input);
-            this.collector.ack(input);
-        }
-
-    }
-
-    public void cleanup() {
-
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if (this.declaredFields != null) {
-            declarer.declare(this.declaredFields);
-        }
-    }
-
-    public boolean isAutoAck() {
-        return this.autoAck;
-    }
-
-    public void setAutoAck(boolean autoAck) {
-        this.autoAck = autoAck;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
deleted file mode 100644
index 9ee175e..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * A simple <code>JmsTupleProducer</code> that expects to receive
- * JMS <code>TextMessage</code> objects with a body in JSON format.
- * <p/>
- * Ouputs a tuple with field name "json" and a string value
- * containing the raw json.
- * <p/>
- * <b>NOTE: </b> Currently this implementation assumes the text is valid
- * JSON and does not attempt to parse or validate it.
- * 
- * @author tgoetz
- *
- */
-@SuppressWarnings("serial")
-public class JsonTupleProducer implements JmsTupleProducer {
-
-	public Values toTuple(Message msg) throws JMSException {
-		if(msg instanceof TextMessage){
-			String json = ((TextMessage) msg).getText();
-			return new Values(json);
-		} else {
-			return null;
-		}
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("json"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
deleted file mode 100644
index 306fc25..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-import org.apache.storm.jms.JmsProvider;
-
-
-/**
- * A <code>JmsProvider</code> that uses the spring framework
- * to obtain a JMS <code>ConnectionFactory</code> and 
- * <code>Desitnation</code> objects.
- * <p/>
- * The constructor takes three arguments:
- * <ol>
- * <li>A string pointing to the the spring application context file contining the JMS configuration
- * (must be on the classpath)
- * </li>
- * <li>The name of the connection factory bean</li>
- * <li>The name of the destination bean</li>
- * </ol>
- * 
- *
- *
- */
-@SuppressWarnings("serial")
-public class SpringJmsProvider implements JmsProvider {
-	private ConnectionFactory connectionFactory;
-	private Destination destination;
-	
-	/**
-	 * Constructs a <code>SpringJmsProvider</code> object given the name of a
-	 * classpath resource (the spring application context file), and the bean
-	 * names of a JMS connection factory and destination.
-	 * 
-	 * @param appContextClasspathResource - the spring configuration file (classpath resource)
-	 * @param connectionFactoryBean - the JMS connection factory bean name
-	 * @param destinationBean - the JMS destination bean name
-	 */
-	public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
-		ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
-		this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
-		this.destination = (Destination)context.getBean(destinationBean);
-	}
-
-	public ConnectionFactory connectionFactory() throws Exception {
-		return this.connectionFactory;
-	}
-
-	public Destination destination() throws Exception {
-		return this.destination;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/resources/jms-activemq.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/resources/jms-activemq.xml b/external/storm-jms/examples/src/main/resources/jms-activemq.xml
deleted file mode 100644
index 1a845b8..0000000
--- a/external/storm-jms/examples/src/main/resources/jms-activemq.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-	<!-- ActiveMQ -->
-
-	<!-- embedded ActiveMQ Broker -->
-	<!-- <amq:broker useJmx="false" persistent="false">
-		<amq:transportConnectors>
-			<amq:transportConnector uri="tcp://localhost:61616" />
-		</amq:transportConnectors>
-	</amq:broker> -->
-
-	<amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
-	
-	<amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
-
-	<amq:connectionFactory id="jmsConnectionFactory"
-		brokerURL="tcp://localhost:61616" />
-
-	<!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
-		<property name="connectionFactory">
-			<ref bean="jmsConnectionFactory" />
-		</property>
-		<property name="pubSubDomain" value="false" />
-	</bean> -->
-	
-</beans>
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/resources/log4j.properties b/external/storm-jms/examples/src/main/resources/log4j.properties
deleted file mode 100644
index 079b195..0000000
--- a/external/storm-jms/examples/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
-log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
-
-
-log4j.logger.backtype.storm.contrib=DEBUG
-log4j.logger.clojure.contrib=WARN
-log4j.logger.org.springframework=WARN
-log4j.logger.org.apache.zookeeper=WARN
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml
index 7364394..b6fff85 100644
--- a/external/storm-jms/pom.xml
+++ b/external/storm-jms/pom.xml
@@ -27,8 +27,9 @@
 
 
 
-    <artifactId>storm-jms-parent</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>storm-jms</artifactId>
+
+
 
     <developers>
         <developer>
@@ -38,11 +39,6 @@
         </developer>
     </developers>
 
-    <modules>
-        <module>core</module>
-        <module>examples</module>
-    </modules>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -51,6 +47,35 @@
             <!-- keep storm out of the jar-with-dependencies -->
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.10</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Active MQ -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>5.5.1</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
new file mode 100644
index 0000000..4932929
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.storm.tuple.ITuple;
+
+/**
+ * JmsMessageProducer implementations are responsible for translating
+ * a <code>org.apache.storm.tuple.Values</code> instance into a
+ * <code>javax.jms.Message</code> object.
+ * <p>
+ */
+public interface JmsMessageProducer extends Serializable {
+
+    /**
+     * Translate a <code>org.apache.storm.tuple.Tuple</code> object
+     * to a <code>javax.jms.Message</code object.
+     *
+     * @param session
+     * @param input
+     * @return
+     * @throws JMSException
+     */
+    public Message toMessage(Session session, ITuple input) throws JMSException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
new file mode 100644
index 0000000..d976326
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+/**
+ * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code>
+ * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage
+ * a topic/queue connection over the course of it's lifecycle.
+ *
+ */
+public interface JmsProvider extends Serializable {
+    /**
+     * Provides the JMS <code>ConnectionFactory</code>
+     *
+     * @return the connection factory
+     * @throws Exception
+     */
+    public ConnectionFactory connectionFactory() throws Exception;
+
+    /**
+     * Provides the <code>Destination</code> (topic or queue) from which the
+     * <code>JmsSpout</code> will receive messages.
+     *
+     * @return
+     * @throws Exception
+     */
+    public Destination destination() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
new file mode 100644
index 0000000..0bbb3a0
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Interface to define classes that can produce a Storm <code>Values</code> objects
+ * from a <code>javax.jms.Message</code> object>.
+ * <p>
+ * Implementations are also responsible for declaring the output
+ * fields they produce.
+ * <p>
+ * If for some reason the implementation can't process a message
+ * (for example if it received a <code>javax.jms.ObjectMessage</code>
+ * when it was expecting a <code>javax.jms.TextMessage</code> it should
+ * return <code>null</code> to indicate to the <code>JmsSpout</code> that
+ * the message could not be processed.
+ *
+ */
+public interface JmsTupleProducer extends Serializable {
+    /**
+     * Process a JMS message object to create a Values object.
+     *
+     * @param msg - the JMS message
+     * @return the Values tuple, or null if the message couldn't be processed.
+     * @throws JMSException
+     */
+    Values toTuple(Message msg) throws JMSException;
+
+    /**
+     * Declare the output fields produced by this JmsTupleProducer.
+     *
+     * @param declarer The OuputFieldsDeclarer for the spout.
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
new file mode 100644
index 0000000..d691e75
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.bolt;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.jms.JmsMessageProducer;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm
+ * topology and publishes JMS Messages to a destination (topic or queue).
+ * <p>
+ * To use a JmsBolt in a topology, the following must be supplied:
+ * <ol>
+ * <li>A <code>JmsProvider</code> implementation.</li>
+ * <li>A <code>JmsMessageProducer</code> implementation.</li>
+ * </ol>
+ * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code>
+ * and <code>javax.jms.Destination</code> objects requied to publish JMS messages.
+ * <p>
+ * The JmsBolt uses a <code>JmsMessageProducer</code> to translate
+ * <code>org.apache.storm.tuple.Tuple</code> objects into
+ * <code>javax.jms.Message</code> objects for publishing.
+ * <p>
+ * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
+ * fail upon deployment to a cluster.
+ * <p>
+ * The JmsBolt is typically an endpoint in a topology -- in other words
+ * it does not emit any tuples.
+ */
+public class JmsBolt extends BaseTickTupleAwareRichBolt {
+    private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
+
+    private boolean autoAck = true;
+
+    // javax.jms objects
+    private Connection connection;
+    private Session session;
+    private MessageProducer messageProducer;
+
+    // JMS options
+    private boolean jmsTransactional = false;
+    private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+
+    private JmsProvider jmsProvider;
+    private JmsMessageProducer producer;
+
+
+    private OutputCollector collector;
+
+    /**
+     * Set the JmsProvider used to connect to the JMS destination topic/queue
+     *
+     * @param provider
+     */
+    public void setJmsProvider(JmsProvider provider) {
+        this.jmsProvider = provider;
+    }
+
+    /**
+     * Set the JmsMessageProducer used to convert tuples
+     * into JMS messages.
+     *
+     * @param producer
+     */
+    public void setJmsMessageProducer(JmsMessageProducer producer) {
+        this.producer = producer;
+    }
+
+    /**
+     * Sets the JMS acknowledgement mode for JMS messages sent
+     * by this bolt.
+     * <p>
+     * Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
+     *
+     * @param acknowledgeMode (constant defined in javax.jms.Session)
+     */
+    public void setJmsAcknowledgeMode(int acknowledgeMode) {
+        this.jmsAcknowledgeMode = acknowledgeMode;
+    }
+
+    /**
+     * Set the JMS transactional setting for the JMS session.
+     *
+     * @param transactional
+     */
+//	public void setJmsTransactional(boolean transactional){
+//		this.jmsTransactional = transactional;
+//	}
+
+    /**
+     * Sets whether or not tuples should be acknowledged by this
+     * bolt.
+     * <p>
+     *
+     * @param autoAck
+     */
+    public void setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+    }
+
+
+    /**
+     * Consumes a tuple and sends a JMS message.
+     * <p>
+     * If autoAck is true, the tuple will be acknowledged
+     * after the message is sent.
+     * <p>
+     * If JMS sending fails, the tuple will be failed.
+     */
+    @Override
+    protected void process(Tuple input) {
+        // write the tuple to a JMS destination...
+        LOG.debug("Tuple received. Sending JMS message.");
+
+        try {
+            Message msg = this.producer.toMessage(this.session, input);
+            if (msg != null) {
+                if (msg.getJMSDestination() != null) {
+                    this.messageProducer.send(msg.getJMSDestination(), msg);
+                } else {
+                    this.messageProducer.send(msg);
+                }
+            }
+            if (this.autoAck) {
+                LOG.debug("ACKing tuple: " + input);
+                this.collector.ack(input);
+            }
+        } catch (JMSException e) {
+            // failed to send the JMS message, fail the tuple fast
+            LOG.warn("Failing tuple: " + input);
+            LOG.warn("Exception: ", e);
+            this.collector.fail(input);
+        }
+    }
+
+    /**
+     * Releases JMS resources.
+     */
+    @Override
+    public void cleanup() {
+        try {
+            LOG.debug("Closing JMS connection.");
+            this.session.close();
+            this.connection.close();
+        } catch (JMSException e) {
+            LOG.warn("Error closing JMS connection.", e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    /**
+     * Initializes JMS resources.
+     */
+    @Override
+    public void prepare(Map stormConf, TopologyContext context,
+                        OutputCollector collector) {
+        if (this.jmsProvider == null || this.producer == null) {
+            throw new IllegalStateException("JMS Provider and MessageProducer not set.");
+        }
+        this.collector = collector;
+        LOG.debug("Connecting JMS..");
+        try {
+            ConnectionFactory cf = this.jmsProvider.connectionFactory();
+            Destination dest = this.jmsProvider.destination();
+            this.connection = cf.createConnection();
+            this.session = connection.createSession(this.jmsTransactional,
+                    this.jmsAcknowledgeMode);
+            this.messageProducer = session.createProducer(dest);
+
+            connection.start();
+        } catch (Exception e) {
+            LOG.warn("Error creating JMS connection.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
new file mode 100644
index 0000000..b78a41e
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.io.Serializable;
+
+public class JmsMessageID implements Comparable<JmsMessageID>, Serializable {
+
+    private String jmsID;
+
+    private Long sequence;
+
+    public JmsMessageID(long sequence, String jmsID){
+        this.jmsID = jmsID;
+        this.sequence = sequence;
+    }
+
+
+    public String getJmsID(){
+        return this.jmsID;
+    }
+
+    @Override
+    public int compareTo(JmsMessageID jmsMessageID) {
+        return (int)(this.sequence - jmsMessageID.sequence);
+    }
+
+    @Override
+    public int hashCode() {
+        return this.sequence.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if(o instanceof JmsMessageID){
+            JmsMessageID id = (JmsMessageID)o;
+            return this.jmsID.equals(id.jmsID);
+        } else {
+            return false;
+        }
+    }
+
+}


Mime
View raw message