activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [27/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:57 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java
new file mode 100644
index 0000000..6d14138
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.server.HornetQComponent;
+
+/**
+ * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface ConnectionLifeCycleListener
+{
+   /**
+    * This method is used both by client connector creation and server connection creation through
+    * acceptors. On the client side the {@code component} parameter is normally passed as
+    * {@code null}.
+    * <p>
+    * Leaving this method here and adding a different one at
+    * {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the
+    * hornetq-server and hornetq-client packages while avoiding to pull too much into hornetq-core.
+    * The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the
+    * usage of it.
+    * @param component This will probably be an {@code Acceptor} and only used on the server side.
+    * @param connection the connection that has been created
+    * @param protocol the messaging protocol type this connection uses
+    */
+   void connectionCreated(HornetQComponent component, Connection connection, String protocol);
+
+   /**
+    * Called when a connection is destroyed.
+    * @param connectionID the connection being destroyed.
+    */
+   void connectionDestroyed(Object connectionID);
+
+
+   /**
+    * Called when an error occurs on the connection.
+    * @param connectionID the id of the connection.
+    * @param me the exception.
+    */
+   void connectionException(Object connectionID, HornetQException me);
+
+   void connectionReadyForWrites(Object connectionID, boolean ready);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java
new file mode 100644
index 0000000..349213b
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.spi.core.remoting;
+
+import java.util.Map;
+
+/**
+ * A Connector is used by the client for creating and controlling a connection.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
+ */
+public interface Connector
+{
+   /**
+    * starts the connector
+    */
+   void start();
+
+   /**
+    * closes the connector
+    */
+   void close();
+
+   /**
+    * returns true if the connector is started, oterwise false.
+    *
+    * @return true if the connector is started
+    */
+   boolean isStarted();
+
+   /**
+    * Create and return a connection from this connector.
+    * <p>
+    * This method must NOT throw an exception if it fails to create the connection
+    * (e.g. network is not available), in this case it MUST return null
+    *
+    * @return The connection, or null if unable to create a connection (e.g. network is unavailable)
+    */
+   Connection createConnection();
+
+   /**
+    * If the configuration is equivalent to this connector, which means
+    * if the parameter configuration is used to create a connection to a target
+    * node, it will be the same node as of the connections made with this connector.
+    * @param configuration
+    * @return true means the configuration is equivalent to the connector. false otherwise.
+    */
+   boolean isEquivalent(Map<String, Object> configuration);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java
new file mode 100644
index 0000000..b309a75
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq6.api.core.TransportConfigurationHelper;
+
+/**
+ * A ConnectorFactory is used by the client for creating connectors.
+ * <p>
+ * A Connector is used to connect to an {@link org.apache.activemq6.spi.core.remoting.Acceptor}.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface ConnectorFactory extends TransportConfigurationHelper
+{
+   /**
+    * creates a new instance of a connector.
+    *
+    * @param configuration       the configuration
+    * @param handler             the handler
+    * @param listener            the listener
+    * @param closeExecutor       the close executor
+    * @param threadPool          the thread pool
+    * @param scheduledThreadPool the scheduled thread pool
+    * @return a new connector
+    */
+   Connector createConnector(Map<String, Object> configuration,
+                             BufferHandler handler,
+                             ConnectionLifeCycleListener listener,
+                             Executor closeExecutor,
+                             Executor threadPool,
+                             ScheduledExecutorService scheduledThreadPool,
+                             ClientProtocolManager protocolManager);
+
+   /**
+    * Returns the allowable properties for this connector.
+    * <p>
+    * This will differ between different connector implementations.
+    *
+    * @return the allowable properties.
+    */
+   Set<String> getAllowableProperties();
+
+   /**
+    * Indicates if connectors from this factory are reliable or not. If a connector is reliable then connection
+    * monitoring (i.e. pings/pongs) will be disabled.
+    *
+    * @return whether or not connectors from this factory are reliable
+    */
+   boolean isReliable();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java
new file mode 100644
index 0000000..51e4155
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.spi.core.remoting;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class ConsumerContext
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java
new file mode 100644
index 0000000..7829410
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+/**
+ * A ReadyListener
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ReadyListener
+{
+   void readyForWriting(boolean ready);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java
new file mode 100644
index 0000000..abc8657
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.spi.core.remoting;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientConsumer;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq6.core.client.impl.ClientConsumerInternal;
+import org.apache.activemq6.core.client.impl.ClientLargeMessageInternal;
+import org.apache.activemq6.core.client.impl.ClientMessageInternal;
+import org.apache.activemq6.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq6.core.client.impl.ClientSessionInternal;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.utils.IDGenerator;
+import org.apache.activemq6.utils.SimpleIDGenerator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class SessionContext
+{
+   protected ClientSessionInternal session;
+
+   protected SendAcknowledgementHandler sendAckHandler;
+
+   protected volatile RemotingConnection remotingConnection;
+
+   protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+
+
+   public SessionContext(RemotingConnection remotingConnection)
+   {
+      this.remotingConnection = remotingConnection;
+   }
+
+
+   public ClientSessionInternal getSession()
+   {
+      return session;
+   }
+
+   public void setSession(ClientSessionInternal session)
+   {
+      this.session = session;
+   }
+
+   /**
+    * it will eather reattach or reconnect, preferably reattaching it.
+    *
+    * @param newConnection
+    * @return true if it was possible to reattach
+    * @throws HornetQException
+    */
+   public abstract boolean reattachOnNewConnection(RemotingConnection newConnection) throws HornetQException;
+
+   public RemotingConnection getRemotingConnection()
+   {
+      return remotingConnection;
+   }
+
+
+   public abstract void closeConsumer(ClientConsumer consumer) throws HornetQException;
+
+   public abstract void sendConsumerCredits(ClientConsumer consumer, int credits);
+
+   public abstract boolean supportsLargeMessage();
+
+   protected void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
+   {
+      ClientSessionInternal session = this.session;
+      if (session != null)
+      {
+         session.handleReceiveLargeMessage(consumerID, clientLargeMessage, largeMessageSize);
+      }
+   }
+
+   protected void handleReceiveMessage(ConsumerContext consumerID, final ClientMessageInternal message) throws Exception
+   {
+
+      ClientSessionInternal session = this.session;
+      if (session != null)
+      {
+         session.handleReceiveMessage(consumerID, message);
+      }
+   }
+
+   protected void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
+   {
+      ClientSessionInternal session = this.session;
+      if (session != null)
+      {
+         session.handleReceiveContinuation(consumerID, chunk, flowControlSize, isContinues);
+      }
+   }
+
+   protected void handleReceiveProducerCredits(SimpleString address, int credits)
+   {
+      ClientSessionInternal session = this.session;
+      if (session != null)
+      {
+         session.handleReceiveProducerCredits(address, credits);
+      }
+
+   }
+
+   protected void handleReceiveProducerFailCredits(SimpleString address, int credits)
+   {
+      ClientSessionInternal session = this.session;
+      if (session != null)
+      {
+         session.handleReceiveProducerFailCredits(address, credits);
+      }
+
+   }
+
+   public abstract int getCreditsOnSendingFull(MessageInternal msgI);
+
+   public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException;
+
+   /**
+    * it should return the number of credits (or bytes) used to send this packet
+    *
+    * @param msgI
+    * @return
+    * @throws HornetQException
+    */
+   public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws HornetQException;
+
+
+   public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws HornetQException;
+
+
+   public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
+
+   public abstract void createSharedQueue(SimpleString address,
+                                          SimpleString queueName,
+                                          SimpleString filterString,
+                                          boolean durable) throws HornetQException;
+
+   public abstract void deleteQueue(SimpleString queueName) throws HornetQException;
+
+   public abstract void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws HornetQException;
+
+   public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws HornetQException;
+
+   public abstract void forceDelivery(ClientConsumer consumer, long sequence) throws HornetQException;
+
+   public abstract ClientSession.AddressQuery addressQuery(final SimpleString address) throws HornetQException;
+
+   public abstract void simpleCommit() throws HornetQException;
+
+
+   /**
+    * If we are doing a simple rollback on the RA, we need to ack the last message sent to the consumer,
+    * otherwise DLQ won't work.
+    * <p/>
+    * this is because we only ACK after on the RA, We may review this if we always acked earlier.
+    *
+    * @param lastMessageAsDelivered
+    * @throws HornetQException
+    */
+   public abstract void simpleRollback(boolean lastMessageAsDelivered) throws HornetQException;
+
+   public abstract void sessionStart() throws HornetQException;
+
+   public abstract void sessionStop() throws HornetQException;
+
+   public abstract void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws HornetQException;
+
+   public abstract void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException;
+
+   public abstract void sessionClose() throws HornetQException;
+
+   public abstract void addSessionMetadata(String key, String data) throws HornetQException;
+
+   public abstract void addUniqueMetaData(String key, String data) throws HornetQException;
+
+   public abstract void sendProducerCreditsMessage(final int credits, final SimpleString address);
+
+   public abstract void xaCommit(Xid xid, boolean onePhase) throws XAException, HornetQException;
+
+   public abstract void xaEnd(Xid xid, int flags) throws XAException, HornetQException;
+
+   public abstract void xaForget(Xid xid) throws XAException, HornetQException;
+
+   public abstract int xaPrepare(Xid xid) throws XAException, HornetQException;
+
+   public abstract Xid[] xaScan() throws HornetQException;
+
+   public abstract void xaRollback(Xid xid, boolean wasStarted) throws HornetQException, XAException;
+
+   public abstract void xaStart(Xid xid, int flags) throws XAException, HornetQException;
+
+   public abstract boolean configureTransactionTimeout(int seconds) throws HornetQException;
+
+   public abstract ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
+                                                         Executor executor, Executor flowControlExecutor) throws HornetQException;
+
+   /**
+    * Performs a round trip to the server requesting what is the current tx timeout on the session
+    *
+    * @return
+    */
+   public abstract int recoverSessionTimeout() throws HornetQException;
+
+   public abstract int getServerVersion();
+
+   public abstract void recreateSession(final String username,
+                                        final String password,
+                                        final int minLargeMessageSize,
+                                        final boolean xa,
+                                        final boolean autoCommitSends,
+                                        final boolean autoCommitAcks,
+                                        final boolean preAcknowledge,
+                                        final SimpleString defaultAddress) throws HornetQException;
+
+
+   public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws HornetQException;
+
+   public abstract void xaFailed(Xid xid) throws HornetQException;
+
+   public abstract void restartSession() throws HornetQException;
+
+   public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
+
+
+   // Failover utility classes
+
+   /**
+    * Interrupt and return any blocked calls
+    */
+   public abstract void returnBlocking(HornetQException cause);
+
+   /**
+    * it will lock the communication channel of the session avoiding anything to come while failover is happening.
+    * It happens on preFailover from ClientSessionImpl
+    */
+   public abstract void lockCommunications();
+
+
+   public abstract void releaseCommunications();
+
+   public abstract void cleanup();
+
+
+   public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java
new file mode 100644
index 0000000..05ef1be
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface TopologyResponseHandler
+{
+   // This is sent when the server is telling the client the node is being disconnected
+   void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID);
+
+   void notifyNodeUp(long uniqueEventID,
+                     final String backupGroupName,
+                     final String scaleDownGroupName,
+                     final String nodeName,
+                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                     final boolean isLast);
+
+   // This is sent when any node on the cluster topology is going down
+   void notifyNodeDown(final long eventTime, final String nodeID);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java
new file mode 100644
index 0000000..f6cebeb
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * Remoting SPI.
+ * <br>
+ * This package defines the Service Provide Interface that
+ * remoting providers must implement to be supported by HornetQ.
+ */
+package org.apache.activemq6.spi.core.remoting;
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java
new file mode 100644
index 0000000..fb19e41
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.SimpleString;
+
+/**
+ * Helper methods to read and write from HornetQBuffer.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class BufferHelper
+{
+
+   /** Size of a String as if it was a Nullable Simple String */
+   public static int sizeOfNullableSimpleString(String str)
+   {
+      if (str == null)
+      {
+         return DataConstants.SIZE_BOOLEAN;
+      }
+      else
+      {
+         return DataConstants.SIZE_BOOLEAN + sizeOfSimpleString(str);
+      }
+   }
+
+   /** Size of a String as it if was a Simple String*/
+   public static int sizeOfSimpleString(String str)
+   {
+      return DataConstants.SIZE_INT + str.length() * 2;
+   }
+
+   public static void writeAsNullableSimpleString(HornetQBuffer buffer, String str)
+   {
+      buffer.writeNullableSimpleString(SimpleString.toSimpleString(str));
+   }
+
+   public static String readNullableSimpleStringAsString(HornetQBuffer buffer)
+   {
+      SimpleString str = buffer.readNullableSimpleString();
+      return str != null ? str.toString() : null;
+   }
+
+   public static void writeAsSimpleString(HornetQBuffer buffer, String str)
+   {
+      buffer.writeSimpleString(new SimpleString(str));
+   }
+
+   /**
+    * @param buffer
+    */
+   public static void writeNullableBoolean(HornetQBuffer buffer, Boolean value)
+   {
+      buffer.writeBoolean(value != null);
+
+      if (value != null)
+      {
+         buffer.writeBoolean(value.booleanValue());
+      }
+   }
+
+   public static int sizeOfNullableBoolean(Boolean value)
+   {
+      return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_BOOLEAN : 0);
+   }
+
+   public static Boolean readNullableBoolean(HornetQBuffer buffer)
+   {
+      boolean isNotNull = buffer.readBoolean();
+
+      if (isNotNull)
+      {
+         return buffer.readBoolean();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   /**
+    * @param buffer
+    */
+   public static void writeNullableLong(HornetQBuffer buffer, Long value)
+   {
+      buffer.writeBoolean(value != null);
+
+      if (value != null)
+      {
+         buffer.writeLong(value.longValue());
+      }
+   }
+
+   /**
+    * @param buffer
+    */
+   public static void writeNullableDouble(HornetQBuffer buffer, Double value)
+   {
+      buffer.writeBoolean(value != null);
+
+      if (value != null)
+      {
+         buffer.writeDouble(value.doubleValue());
+      }
+   }
+
+   public static int sizeOfNullableLong(Long value)
+   {
+      return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_LONG : 0);
+   }
+
+   public static int sizeOfNullableDouble(Double value)
+   {
+      return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_DOUBLE : 0);
+   }
+
+
+   public static Long readNullableLong(HornetQBuffer buffer)
+   {
+      boolean isNotNull = buffer.readBoolean();
+
+      if (isNotNull)
+      {
+         return buffer.readLong();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   /**
+    * @param buffer
+    */
+   public static void writeNullableInteger(HornetQBuffer buffer, Integer value)
+   {
+      buffer.writeBoolean(value != null);
+
+      if (value != null)
+      {
+         buffer.writeInt(value.intValue());
+      }
+   }
+
+   public static int sizeOfNullableInteger(Integer value)
+   {
+      return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_INT : 0);
+   }
+
+   public static Integer readNullableInteger(HornetQBuffer buffer)
+   {
+      boolean isNotNull = buffer.readBoolean();
+
+      if (isNotNull)
+      {
+         return buffer.readInt();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public static Double readNullableDouble(HornetQBuffer buffer)
+   {
+      boolean isNotNull = buffer.readBoolean();
+
+      if (isNotNull)
+      {
+         return buffer.readDouble();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java
new file mode 100644
index 0000000..bf619cf
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * A ConfigurationHelper
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class ConfigurationHelper
+{
+   public static String getStringProperty(final String propName, final String def, final Map<String, Object> props)
+   {
+      if (props == null)
+      {
+         return def;
+      }
+
+      Object prop = props.get(propName);
+
+      if (prop == null)
+      {
+         return def;
+      }
+      else
+      {
+         if (prop instanceof String == false)
+         {
+            return prop.toString();
+         }
+         else
+         {
+            return (String)prop;
+         }
+      }
+   }
+
+   public static int getIntProperty(final String propName, final int def, final Map<String, Object> props)
+   {
+      if (props == null)
+      {
+         return def;
+      }
+      Object prop = props.get(propName);
+
+      if (prop == null)
+      {
+         return def;
+      }
+      else
+      {
+         // The resource adapter will aways send Strings, hence the conversion here
+         if (prop instanceof String)
+         {
+            return Integer.valueOf((String)prop);
+         }
+         else if (prop instanceof Number == false)
+         {
+            HornetQClientLogger.LOGGER.propertyNotInteger(propName, prop.getClass().getName());
+
+            return def;
+         }
+         else
+         {
+            return ((Number)prop).intValue();
+         }
+      }
+   }
+
+   public static long getLongProperty(final String propName, final long def, final Map<String, Object> props)
+   {
+      if (props == null)
+      {
+         return def;
+      }
+
+      Object prop = props.get(propName);
+
+      if (prop == null)
+      {
+         return def;
+      }
+      else
+      {
+         // The resource adapter will aways send Strings, hence the conversion here
+         if (prop instanceof String)
+         {
+            return Long.valueOf((String)prop);
+         }
+         else if (prop instanceof Number == false)
+         {
+            HornetQClientLogger.LOGGER.propertyNotLong(propName, prop.getClass().getName());
+
+            return def;
+         }
+         else
+         {
+            return ((Number)prop).longValue();
+         }
+      }
+   }
+
+   public static boolean getBooleanProperty(final String propName, final boolean def, final Map<String, Object> props)
+   {
+      if (props == null)
+      {
+         return def;
+      }
+
+      Object prop = props.get(propName);
+
+      if (prop == null)
+      {
+         return def;
+      }
+      else
+      {
+         // The resource adapter will aways send Strings, hence the conversion here
+         if (prop instanceof String)
+         {
+            return Boolean.valueOf((String)prop);
+         }
+         else if (prop instanceof Boolean == false)
+         {
+            HornetQClientLogger.LOGGER.propertyNotBoolean(propName, prop.getClass().getName());
+
+            return def;
+         }
+         else
+         {
+            return (Boolean)prop;
+         }
+      }
+   }
+
+   public static Set<String> checkKeys(final Set<String> allowableKeys, final Set<String> keys)
+   {
+      Set<String> invalid = new HashSet<String>();
+
+      for (String key : keys)
+      {
+         if (!allowableKeys.contains(key))
+         {
+            invalid.add(key);
+         }
+      }
+      return invalid;
+   }
+
+   public static Set<String> checkKeysExist(final Set<String> requiredKeys, final Set<String> keys)
+   {
+      Set<String> invalid = new HashSet<String>(requiredKeys);
+
+      for (String key : keys)
+      {
+         if (requiredKeys.contains(key))
+         {
+            invalid.remove(key);
+         }
+      }
+      return invalid;
+   }
+
+   public static String stringSetToCommaListString(final Set<String> invalid)
+   {
+      StringBuilder sb = new StringBuilder();
+      int count = 0;
+      for (String key : invalid)
+      {
+         sb.append(key);
+         if (count != invalid.size() - 1)
+         {
+            sb.append(", ");
+         }
+         count++;
+      }
+      return sb.toString();
+   }
+
+   public static String getPasswordProperty(final String propName,
+         final String def, final Map<String, Object> props,
+         String defaultMaskPassword, String defaultPasswordCodec)
+   {
+      if (props == null)
+      {
+         return def;
+      }
+
+      Object prop = props.get(propName);
+
+      if (prop == null)
+      {
+         return def;
+      }
+
+      String value = prop.toString();
+      Boolean useMask = (Boolean) props.get(defaultMaskPassword);
+      if (useMask == null || (!useMask))
+      {
+         return value;
+      }
+
+      final String classImpl = (String) props.get(defaultPasswordCodec);
+
+      if (classImpl == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.noCodec();
+      }
+
+      SensitiveDataCodec<String> codec = null;
+      try
+      {
+         codec = PasswordMaskingUtil.getCodec(classImpl);
+      }
+      catch (HornetQException e1)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.failedToGetDecoder(e1);
+      }
+
+      try
+      {
+         return codec.decode(value);
+      }
+      catch (Exception e)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.errordecodingPassword(e);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java
new file mode 100644
index 0000000..85d7034
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.activemq6.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TODO: get rid of this
+ */
+public final class ConfirmationWindowWarning
+{
+   public final boolean disabled;
+   public final AtomicBoolean warningIssued;
+
+   /**
+    *
+    */
+   public ConfirmationWindowWarning(boolean disabled)
+   {
+      this.disabled = disabled;
+      warningIssued = new AtomicBoolean(false);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java
new file mode 100644
index 0000000..c735fa0
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.Deflater;
+
+/**
+ * A DeflaterReader
+ * The reader takes an inputstream and compress it.
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class DeflaterReader extends InputStream
+{
+   private final Deflater deflater = new Deflater();
+   private boolean isFinished = false;
+   private boolean compressDone = false;
+
+   private InputStream input;
+
+   private final AtomicLong bytesRead;
+
+   public DeflaterReader(final InputStream inData, final AtomicLong bytesRead)
+   {
+      input = inData;
+      this.bytesRead = bytesRead;
+   }
+
+   @Override
+   public int read() throws IOException
+   {
+      byte[] buffer = new byte[1];
+      int n = read(buffer, 0, 1);
+      if (n == 1)
+      {
+         return buffer[0] & 0xFF;
+      }
+      if (n == -1 || n == 0)
+      {
+         return -1;
+      }
+      throw new IOException("Error reading data, invalid n: " + n);
+   }
+
+   /**
+    * Try to fill the buffer with compressed bytes. Except the last effective read,
+    * this method always returns with a full buffer of compressed data.
+    *
+    * @param buffer the buffer to fill compressed bytes
+    * @return the number of bytes really filled, -1 indicates end.
+    * @throws IOException
+    */
+   @Override
+   public int read(final byte[] buffer, int offset, int len) throws IOException
+   {
+      if (compressDone)
+      {
+         return -1;
+      }
+
+      //buffer for reading input stream
+      byte[] readBuffer = new byte[2 * len];
+
+      int n = 0;
+      int read = 0;
+
+      while (len > 0)
+      {
+         n = deflater.deflate(buffer, offset, len);
+         if (n == 0)
+         {
+            if (isFinished)
+            {
+               deflater.end();
+               compressDone = true;
+               break;
+            }
+            else if (deflater.needsInput())
+            {
+               // read some data from inputstream
+               int m = input.read(readBuffer);
+
+               if (m == -1)
+               {
+                  deflater.finish();
+                  isFinished = true;
+               }
+               else
+               {
+                  if (bytesRead != null)
+                  {
+                     bytesRead.addAndGet(m);
+                  }
+                  deflater.setInput(readBuffer, 0, m);
+               }
+            }
+            else
+            {
+               deflater.finish();
+               isFinished = true;
+            }
+         }
+         else
+         {
+            read += n;
+            offset += n;
+            len -= n;
+         }
+      }
+      return read;
+   }
+
+   public void closeStream() throws IOException
+   {
+      super.close();
+      input.close();
+   }
+
+   public long getTotalSize()
+   {
+      return bytesRead.get();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java
new file mode 100644
index 0000000..b2d00f2
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.util.concurrent.Executor;
+
+/**
+ *
+ * A ExecutorFactory
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public interface ExecutorFactory
+{
+   Executor getExecutor();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java
new file mode 100644
index 0000000..c4fa561
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Future
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class FutureLatch implements Runnable
+{
+   private final CountDownLatch latch;
+
+   public FutureLatch()
+   {
+      super();
+      latch = new CountDownLatch(1);
+   }
+
+   public FutureLatch(int latches)
+   {
+      super();
+      latch =  new CountDownLatch(latches);
+   }
+
+   public boolean await(final long timeout)
+   {
+      try
+      {
+         return latch.await(timeout, TimeUnit.MILLISECONDS);
+      }
+      catch (InterruptedException e)
+      {
+         return false;
+      }
+   }
+
+   public void run()
+   {
+      latch.countDown();
+   }
+
+   @Override
+   public String toString()
+   {
+      return FutureLatch.class.getSimpleName() + "(latch=" + latch + ")";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java
new file mode 100644
index 0000000..cb2edde
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+   /* (non-Javadoc)
+    * @see java.io.InputStream#read()
+    */
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private HornetQBuffer bb;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+   {
+      bb = paramByteBuffer;
+   }
+
+   @Override
+   public int read() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (remainingBytes() == 0)
+      {
+         return -1;
+      }
+      else
+      {
+         return bb.readByte() & 0xFF;
+      }
+   }
+
+   @Override
+   public int read(final byte[] byteArray) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      return read(byteArray, 0, byteArray.length);
+   }
+
+   @Override
+   public int read(final byte[] byteArray, final int off, final int len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (byteArray == null)
+      {
+         throw new NullPointerException();
+      }
+      if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      if (len == 0)
+      {
+         return 0;
+      }
+
+      int size = Math.min(remainingBytes(), len);
+
+      if (size == 0)
+      {
+         return -1;
+      }
+
+      bb.readBytes(byteArray, off, size);
+      return size;
+   }
+
+   @Override
+   public long skip(final long len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("skip on a closed InputStream");
+      }
+
+      if (len <= 0L)
+      {
+         return 0L;
+      }
+
+      int size = Math.min(remainingBytes(), (int) len);
+
+      bb.skipBytes(size);
+
+      return size;
+   }
+
+   @Override
+   public int available() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("available on a closed InputStream");
+      }
+
+      return remainingBytes();
+   }
+
+   @Override
+   public void close() throws IOException
+   {
+      bb = null;
+   }
+
+   @Override
+   public synchronized void mark(final int paramInt)
+   {
+   }
+
+   @Override
+   public synchronized void reset() throws IOException
+   {
+      throw new IOException("mark/reset not supported");
+   }
+
+   @Override
+   public boolean markSupported()
+   {
+      return false;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @return
+    */
+   private int remainingBytes()
+   {
+      return bb.writerIndex() - bb.readerIndex();
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java
new file mode 100644
index 0000000..0d31ff7
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+/**
+ * Generator of record IDs for the journals.
+ * <p>
+ * Notice that while the bindings and messages journals are independent from one another they use
+ * the same {@link IDGenerator} instance.
+ * <p>
+ * The next recordID should be persisted in the journals during a normal shutdown. The lack of such
+ * a record indicates a server crash. During server restart, if the journals lack a
+ * {@literal next-recordID} record, we use the last recorded ID plus {@code MAX_INT}.
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface IDGenerator
+{
+   long generateID();
+
+   long getCurrentID();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java
new file mode 100644
index 0000000..13eea8b
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * An InflaterReader
+ * It takes an compressed input stream and decompressed it as it is being read.
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class InflaterReader extends InputStream
+{
+   private Inflater inflater = new Inflater();
+
+   private InputStream input;
+
+   private byte[] readBuffer;
+   private int pointer;
+   private int length;
+
+   public InflaterReader(InputStream input)
+   {
+      this(input, 1024);
+   }
+
+   public InflaterReader(InputStream input, int bufferSize)
+   {
+      this.input = input;
+      this.readBuffer = new byte[bufferSize];
+      this.pointer = -1;
+   }
+
+   public int read() throws IOException
+   {
+      if (pointer == -1)
+      {
+         try
+         {
+            length = doRead(readBuffer, 0, readBuffer.length);
+            if (length == 0)
+            {
+               return -1;
+            }
+            pointer = 0;
+         }
+         catch (DataFormatException e)
+         {
+            IOException e2 = new IOException(e.getMessage());
+            e2.initCause(e);
+            throw e2;
+         }
+      }
+
+      int value = readBuffer[pointer] & 0xFF;
+      pointer++;
+      if (pointer == length)
+      {
+         pointer = -1;
+      }
+
+      return value;
+   }
+
+   /*
+    * feed inflater more bytes in order to get some
+    * decompressed output.
+    * returns number of bytes actually got
+    */
+   private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException
+   {
+      int read = 0;
+      int n = 0;
+      byte[] inputBuffer = new byte[len];
+
+      while (len > 0)
+      {
+         n = inflater.inflate(buf, offset, len);
+         if (n == 0)
+         {
+            if (inflater.finished())
+            {
+               break;
+            }
+            else if (inflater.needsInput())
+            {
+               //feeding
+               int m = input.read(inputBuffer);
+
+               if (m == -1)
+               {
+                  //it shouldn't be here, throw exception
+                  throw new DataFormatException("Input is over while inflater still expecting data");
+               }
+               else
+               {
+                  //feed the data in
+                  inflater.setInput(inputBuffer, 0, m);
+                  n = inflater.inflate(buf, offset, len);
+                  if (n > 0)
+                  {
+                     read += n;
+                     offset += n;
+                     len -= n;
+                  }
+               }
+            }
+            else
+            {
+               //it shouldn't be here, throw
+               throw new DataFormatException("Inflater is neither finished nor needing input.");
+            }
+         }
+         else
+         {
+            read += n;
+            offset += n;
+            len -= n;
+         }
+      }
+      return read;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java
new file mode 100644
index 0000000..e2a4691
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * A InflaterWriter
+ * <p>
+ * This class takes an OutputStream. Compressed bytes
+ * can directly be written into this class. The class will
+ * decompress the bytes and write them to the output stream.
+ * <p>
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class InflaterWriter extends OutputStream
+{
+   private final Inflater inflater = new Inflater();
+
+   private final OutputStream output;
+
+   private final byte[] writeBuffer = new byte[1024];
+
+   private int writePointer = 0;
+
+   private final byte[] outputBuffer = new byte[writeBuffer.length * 2];
+
+   public InflaterWriter(final OutputStream output)
+   {
+      this.output = output;
+   }
+
+   /*
+    * Write a compressed byte.
+    */
+   @Override
+   public void write(final int b) throws IOException
+   {
+      writeBuffer[writePointer] = (byte)(b & 0xFF);
+      writePointer++;
+
+      if (writePointer == writeBuffer.length)
+      {
+         writePointer = 0;
+         try
+         {
+            doWrite();
+         }
+         catch (DataFormatException e)
+         {
+            IOException ie = new IOException("Error decompressing data");
+            ie.initCause(e);
+            throw ie;
+         }
+      }
+   }
+
+   @Override
+   public void close() throws IOException
+   {
+      if (writePointer > 0)
+      {
+         inflater.setInput(writeBuffer, 0, writePointer);
+         try
+         {
+            int n = inflater.inflate(outputBuffer);
+            while (n > 0)
+            {
+               output.write(outputBuffer, 0, n);
+               n = inflater.inflate(outputBuffer);
+            }
+            output.close();
+         }
+         catch (DataFormatException e)
+         {
+            IOException io = new IOException(e.getMessage());
+            io.initCause(e);
+            throw io;
+         }
+      }
+   }
+
+   private void doWrite() throws DataFormatException, IOException
+   {
+      inflater.setInput(writeBuffer);
+      int n = inflater.inflate(outputBuffer);
+
+      while (n > 0)
+      {
+         output.write(outputBuffer, 0, n);
+         n = inflater.inflate(outputBuffer);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java
new file mode 100644
index 0000000..0af4b02
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.util.StringTokenizer;
+
+import javax.naming.Binding;
+import javax.naming.Context;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingEnumeration;
+import javax.naming.NamingException;
+
+/**
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ *
+ */
+public class JNDIUtil
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   /**
+    * Create a context path recursively.
+    */
+   public static Context createContext(final Context c, final String path) throws NamingException
+   {
+      Context crtContext = c;
+      for (StringTokenizer st = new StringTokenizer(path, "/"); st.hasMoreTokens();)
+      {
+         String tok = st.nextToken();
+
+         try
+         {
+            Object o = crtContext.lookup(tok);
+            if (!(o instanceof Context))
+            {
+               throw new NamingException("Path " + path + " overwrites and already bound object");
+            }
+            crtContext = (Context)o;
+            continue;
+         }
+         catch (NameNotFoundException e)
+         {
+            // OK
+         }
+         crtContext = crtContext.createSubcontext(tok);
+      }
+      return crtContext;
+   }
+
+   public static void tearDownRecursively(final Context c) throws Exception
+   {
+      for (NamingEnumeration<Binding> ne = c.listBindings(""); ne.hasMore();)
+      {
+         Binding b = ne.next();
+         String name = b.getName();
+         Object object = b.getObject();
+         if (object instanceof Context)
+         {
+            JNDIUtil.tearDownRecursively((Context)object);
+         }
+         c.unbind(name);
+      }
+   }
+
+   /**
+    * Context.rebind() requires that all intermediate contexts and the target context (that named by
+    * all but terminal atomic component of the name) must already exist, otherwise
+    * NameNotFoundException is thrown. This method behaves similar to Context.rebind(), but creates
+    * intermediate contexts, if necessary.
+    */
+   public static void rebind(final Context c, final String jndiName, final Object o) throws NamingException
+   {
+      Context context = c;
+      String name = jndiName;
+
+      int idx = jndiName.lastIndexOf('/');
+      if (idx != -1)
+      {
+         context = JNDIUtil.createContext(c, jndiName.substring(0, idx));
+         name = jndiName.substring(idx + 1);
+      }
+      boolean failed = false;
+      try
+      {
+         context.rebind(name, o);
+      }
+      catch (Exception ignored)
+      {
+         failed = true;
+      }
+      if (failed)
+      {
+         context.bind(name, o);
+      }
+   }
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java
new file mode 100644
index 0000000..3d288e8
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+/**
+ * A LinkedList
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LinkedList<E>
+{
+   void addHead(E e);
+
+   void addTail(E e);
+
+   E poll();
+
+   LinkedListIterator<E> iterator();
+
+   void clear();
+
+   int size();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java
new file mode 100644
index 0000000..382d471
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.lang.reflect.Array;
+import java.util.NoSuchElementException;
+
+
+/**
+ * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
+ * elements added or removed from the queue either directly or via iterators.
+ *
+ * This class is not thread safe.
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class LinkedListImpl<E> implements LinkedList<E>
+{
+   private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
+
+   private final Node<E> head = new Node<E>(null);
+
+   private Node<E> tail = null;
+
+   private int size;
+
+   // We store in an array rather than a Map for the best performance
+   private volatile Iterator[] iters;
+
+   private int numIters;
+
+   private int nextIndex;
+
+   public LinkedListImpl()
+   {
+      iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
+   }
+
+   public void addHead(E e)
+   {
+      Node<E> node = new Node<E>(e);
+
+      node.next = head.next;
+
+      node.prev = head;
+
+      head.next = node;
+
+      if (size == 0)
+      {
+         tail = node;
+      }
+      else
+      {
+         // Need to set the previous element on the former head
+         node.next.prev = node;
+      }
+
+      size++;
+   }
+
+   public void addTail(E e)
+   {
+      if (size == 0)
+      {
+         addHead(e);
+      }
+      else
+      {
+         Node<E> node = new Node<E>(e);
+
+         node.prev = tail;
+
+         tail.next = node;
+
+         tail = node;
+
+         size++;
+      }
+   }
+
+   public E poll()
+   {
+      Node<E> ret = head.next;
+
+      if (ret != null)
+      {
+         removeAfter(head);
+
+         return ret.val;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public void clear()
+   {
+      tail = head.next = null;
+
+      size = 0;
+   }
+
+   public int size()
+   {
+      return size;
+   }
+
+   public LinkedListIterator<E> iterator()
+   {
+      return new Iterator();
+   }
+
+   public String toString()
+   {
+      StringBuilder str = new StringBuilder("LinkedListImpl [ ");
+
+      Node<E> node = head;
+
+      while (node != null)
+      {
+         str.append(node.toString());
+
+         if (node.next != null)
+         {
+            str.append(", ");
+         }
+
+         node = node.next;
+      }
+
+      return str.toString();
+   }
+
+   public int numIters()
+   {
+      return numIters;
+   }
+
+   private Iterator[] createIteratorArray(int size)
+   {
+      return (Iterator[])Array.newInstance(Iterator.class, size);
+   }
+
+   private void removeAfter(Node<E> node)
+   {
+      Node<E> toRemove = node.next;
+
+      node.next = toRemove.next;
+
+      if (toRemove.next != null)
+      {
+         toRemove.next.prev = node;
+      }
+
+      if (toRemove == tail)
+      {
+         tail = node;
+      }
+
+      size--;
+
+      if (toRemove.iterCount != 0)
+      {
+         LinkedListImpl.this.nudgeIterators(toRemove);
+      }
+
+      //Help GC - otherwise GC potentially has to traverse a very long list to see if elements are reachable, this can result in OOM
+      //https://jira.jboss.org/browse/HORNETQ-469
+      toRemove.next = toRemove.prev = null;
+   }
+
+   private synchronized void nudgeIterators(Node<E> node)
+   {
+      for (int i = 0; i < numIters; i++)
+      {
+         Iterator iter = iters[i];
+         if (iter != null)
+         {
+            iter.nudged(node);
+         }
+      }
+   }
+
+   private synchronized void addIter(Iterator iter)
+   {
+      if (numIters == iters.length)
+      {
+         resize(2 * numIters);
+      }
+
+      iters[nextIndex++] = iter;
+
+      numIters++;
+   }
+
+   private synchronized void resize(int newSize)
+   {
+      Iterator[] newIters = createIteratorArray(newSize);
+
+      System.arraycopy(iters, 0, newIters, 0, numIters);
+
+      iters = newIters;
+   }
+
+   private synchronized void removeIter(Iterator iter)
+   {
+      for (int i = 0; i < numIters; i++)
+      {
+         if (iter == iters[i])
+         {
+            iters[i] = null;
+
+            if (i != numIters - 1)
+            {
+               // Fill in the hole
+
+               System.arraycopy(iters, i + 1, iters, i, numIters - i - 1);
+            }
+
+            numIters--;
+
+            if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2)
+            {
+               resize(numIters);
+            }
+
+            nextIndex--;
+
+            return;
+         }
+      }
+
+      throw new IllegalStateException("Cannot find iter to remove");
+   }
+
+   private static final class Node<E>
+   {
+      Node<E> next;
+
+      Node<E> prev;
+
+      final E val;
+
+      int iterCount;
+
+      Node(E e)
+      {
+         val = e;
+      }
+
+      public String toString()
+      {
+         return "Node, value = " + val;
+      }
+   }
+
+   private class Iterator implements LinkedListIterator<E>
+   {
+      Node<E> last;
+
+      Node<E> current = head.next;
+
+      boolean repeat;
+
+      Iterator()
+      {
+         if (current != null)
+         {
+            current.iterCount++;
+         }
+
+         addIter(this);
+      }
+
+      public void repeat()
+      {
+         repeat = true;
+      }
+
+      public boolean hasNext()
+      {
+         Node<E> e = getNode();
+
+         if (e != null && (e != last || repeat))
+         {
+            return true;
+         }
+
+         return canAdvance();
+      }
+
+      public E next()
+      {
+         Node<E> e = getNode();
+
+         if (repeat)
+         {
+            repeat = false;
+
+            if (e != null)
+            {
+               return e.val;
+            }
+            else
+            {
+               if (canAdvance())
+               {
+                  advance();
+
+                  e = getNode();
+
+                  return e.val;
+               }
+               else
+               {
+                  throw new NoSuchElementException();
+               }
+            }
+         }
+
+         if (e == null || e == last)
+         {
+            if (canAdvance())
+            {
+               advance();
+
+               e = getNode();
+            }
+            else
+            {
+               throw new NoSuchElementException();
+            }
+         }
+
+         last = e;
+
+         repeat = false;
+
+         return e.val;
+      }
+
+      public void remove()
+      {
+         if (last == null)
+         {
+            throw new NoSuchElementException();
+         }
+
+         if (current == null)
+         {
+            throw new NoSuchElementException();
+         }
+
+         LinkedListImpl.this.removeAfter(current.prev);
+
+         last = null;
+      }
+
+      public void close()
+      {
+         removeIter(this);
+      }
+
+      public void nudged(Node<E> node)
+      {
+         if (current == node)
+         {
+            if (canAdvance())
+            {
+               advance();
+            }
+            else
+            {
+               if (current.prev != head)
+               {
+                  current.iterCount--;
+
+                  current = current.prev;
+
+                  current.iterCount++;
+               }
+               else
+               {
+                  current = null;
+               }
+            }
+         }
+      }
+
+      private Node<E> getNode()
+      {
+         if (current == null)
+         {
+            current = head.next;
+
+            if (current != null)
+            {
+               current.iterCount++;
+            }
+         }
+
+         if (current != null)
+         {
+            return current;
+         }
+         else
+         {
+            return null;
+         }
+      }
+
+      private boolean canAdvance()
+      {
+         if (current == null)
+         {
+            current = head.next;
+
+            if (current != null)
+            {
+               current.iterCount++;
+            }
+         }
+
+         return current != null && current.next != null;
+      }
+
+      private void advance()
+      {
+         if (current == null || current.next == null)
+         {
+            throw new NoSuchElementException();
+         }
+
+         current.iterCount--;
+
+         current = current.next;
+
+         current.iterCount++;
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java
new file mode 100644
index 0000000..8100a78
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import java.util.Iterator;
+
+
+/**
+ * A LinkedListIterator
+ *
+ * This iterator allows the last element to be repeated in the next call to hasNext or next
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LinkedListIterator<E> extends Iterator<E>
+{
+   void repeat();
+
+   void close();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java
new file mode 100644
index 0000000..b9633b6
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.utils;
+
+import org.apache.activemq6.core.client.HornetQClientLogger;
+
+import java.lang.ref.WeakReference;
+
+
+/**
+ * A MemorySize
+ *
+ * @author Clebert Suconic
+ * @author Tim Fox
+ *
+ *
+ */
+public class MemorySize
+{
+   private static final int numberOfObjects = 10000;
+
+   private static Object newObject(final ObjectFactory factory) throws Exception
+   {
+      return factory.createObject();
+   }
+
+   public static boolean is64bitArch()
+   {
+      boolean is64bit = true; // Default to 64 e.g. if can't retrieve property
+
+      try
+      {
+         String arch = System.getProperty("os.arch");
+
+         if (arch != null)
+         {
+            is64bit = arch.contains("64");
+         }
+      }
+      catch (Exception e)
+      {
+         // Ignore
+      }
+
+      return is64bit;
+   }
+
+   public interface ObjectFactory
+   {
+      Object createObject();
+   }
+
+   public static int calculateSize(final ObjectFactory factory) throws Exception
+   {
+      final Runtime runtime = Runtime.getRuntime();
+
+      MemorySize.getMemorySize(runtime);
+
+      MemorySize.newObject(factory);
+
+      int i = 0;
+      long heap1 = 0;
+      long heap2 = 0;
+      long totalMemory1 = 0;
+      long totalMemory2 = 0;
+
+      // First we do a dry run with twice as many then throw away the results
+
+      Object[] obj = new Object[MemorySize.numberOfObjects * 2];
+
+      for (i = 0; i < MemorySize.numberOfObjects * 2; i++)
+      {
+         obj[i] = MemorySize.newObject(factory);
+      }
+
+      obj = new Object[MemorySize.numberOfObjects * 2];
+
+      heap1 = MemorySize.getMemorySize(runtime);
+
+      totalMemory1 = runtime.totalMemory();
+
+      for (i = 0; i < MemorySize.numberOfObjects; i++)
+      {
+         obj[i] = MemorySize.newObject(factory);
+      }
+
+      heap2 = MemorySize.getMemorySize(runtime);
+
+      totalMemory2 = runtime.totalMemory();
+
+      final int size = Math.round((float)(heap2 - heap1) / MemorySize.numberOfObjects);
+
+      if (totalMemory1 != totalMemory2)
+      {
+         // throw new IllegalStateException("Warning: JVM allocated more data what would make results invalid " +
+         // totalMemory1 + ":" + totalMemory2);
+
+         HornetQClientLogger.LOGGER.jvmAllocatedMoreMemory(totalMemory1, totalMemory2);
+      }
+
+      return size;
+   }
+
+   private static long getMemorySize(final Runtime runtime)
+   {
+      for (int i = 0; i < 5; i++)
+      {
+         MemorySize.forceGC();
+      }
+      return runtime.totalMemory() - runtime.freeMemory();
+   }
+
+   private static void forceGC()
+   {
+      WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
+      // A loop that will wait GC, using the minimal time as possible
+      while (dumbReference.get() != null)
+      {
+         System.gc();
+         try
+         {
+            Thread.sleep(500);
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+   }
+
+}


Mime
View raw message