activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [30/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:00 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
new file mode 100644
index 0000000..6dd712c
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq6.utils.XidCodecSupport;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class SessionXAResumeMessage extends PacketImpl
+{
+
+   private Xid xid;
+
+   public SessionXAResumeMessage(final Xid xid)
+   {
+      super(SESS_XA_RESUME);
+
+      this.xid = xid;
+   }
+
+   public SessionXAResumeMessage()
+   {
+      super(SESS_XA_RESUME);
+   }
+
+   // Public --------------------------------------------------------
+
+   public Xid getXid()
+   {
+      return xid;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      XidCodecSupport.encodeXid(xid, buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      xid = XidCodecSupport.decodeXid(buffer);
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((xid == null) ? 0 : xid.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionXAResumeMessage))
+         return false;
+      SessionXAResumeMessage other = (SessionXAResumeMessage)obj;
+      if (xid == null)
+      {
+         if (other.xid != null)
+            return false;
+      }
+      else if (!xid.equals(other.xid))
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
new file mode 100644
index 0000000..fc0679a
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq6.utils.XidCodecSupport;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionXARollbackMessage extends PacketImpl
+{
+   private Xid xid;
+
+   public SessionXARollbackMessage(final Xid xid)
+   {
+      super(SESS_XA_ROLLBACK);
+
+      this.xid = xid;
+   }
+
+   public SessionXARollbackMessage()
+   {
+      super(SESS_XA_ROLLBACK);
+   }
+
+   // Public --------------------------------------------------------
+
+   public Xid getXid()
+   {
+      return xid;
+   }
+
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      XidCodecSupport.encodeXid(xid, buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      xid = XidCodecSupport.decodeXid(buffer);
+   }
+
+   @Override
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((xid == null) ? 0 : xid.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionXARollbackMessage))
+         return false;
+      SessionXARollbackMessage other = (SessionXARollbackMessage)obj;
+      if (xid == null)
+      {
+         if (other.xid != null)
+            return false;
+      }
+      else if (!xid.equals(other.xid))
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
new file mode 100644
index 0000000..94b5f28
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionXASetTimeoutMessage extends PacketImpl
+{
+   private int timeoutSeconds;
+
+   public SessionXASetTimeoutMessage(final int timeoutSeconds)
+   {
+      super(SESS_XA_SET_TIMEOUT);
+
+      this.timeoutSeconds = timeoutSeconds;
+   }
+
+   public SessionXASetTimeoutMessage()
+   {
+      super(SESS_XA_SET_TIMEOUT);
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getTimeoutSeconds()
+   {
+      return timeoutSeconds;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(timeoutSeconds);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      timeoutSeconds = buffer.readInt();
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + timeoutSeconds;
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionXASetTimeoutMessage))
+         return false;
+      SessionXASetTimeoutMessage other = (SessionXASetTimeoutMessage)obj;
+      if (timeoutSeconds != other.timeoutSeconds)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
new file mode 100644
index 0000000..4ac036e
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionXASetTimeoutResponseMessage extends PacketImpl
+{
+   private boolean ok;
+
+   public SessionXASetTimeoutResponseMessage(final boolean ok)
+   {
+      super(SESS_XA_SET_TIMEOUT_RESP);
+
+      this.ok = ok;
+   }
+
+   public SessionXASetTimeoutResponseMessage()
+   {
+      super(SESS_XA_SET_TIMEOUT_RESP);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public boolean isResponse()
+   {
+      return true;
+   }
+
+   public boolean isOK()
+   {
+      return ok;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(ok);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      ok = buffer.readBoolean();
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (ok ? 1231 : 1237);
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionXASetTimeoutResponseMessage))
+         return false;
+      SessionXASetTimeoutResponseMessage other = (SessionXASetTimeoutResponseMessage)obj;
+      if (ok != other.ok)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
new file mode 100644
index 0000000..ad2a73b
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq6.utils.XidCodecSupport;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionXAStartMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private Xid xid;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionXAStartMessage(final Xid xid)
+   {
+      super(SESS_XA_START);
+
+      this.xid = xid;
+   }
+
+   public SessionXAStartMessage()
+   {
+      super(SESS_XA_START);
+   }
+
+   // Public --------------------------------------------------------
+
+   public Xid getXid()
+   {
+      return xid;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      XidCodecSupport.encodeXid(xid, buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      xid = XidCodecSupport.decodeXid(buffer);
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((xid == null) ? 0 : xid.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionXAStartMessage))
+         return false;
+      SessionXAStartMessage other = (SessionXAStartMessage)obj;
+      if (xid == null)
+      {
+         if (other.xid != null)
+            return false;
+      }
+      else if (!xid.equals(other.xid))
+         return false;
+      return true;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
new file mode 100644
index 0000000..d838779
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl
+{
+
+   private boolean clusterConnection;
+
+   public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConnection)
+   {
+      super(SUBSCRIBE_TOPOLOGY);
+
+      this.clusterConnection = clusterConnection;
+   }
+
+   protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection)
+   {
+      super(packetType);
+
+      this.clusterConnection = clusterConnection;
+   }
+
+   public SubscribeClusterTopologyUpdatesMessage()
+   {
+      super(SUBSCRIBE_TOPOLOGY);
+   }
+
+   protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
+   {
+      super(packetType);
+   }
+
+   // Public --------------------------------------------------------
+
+   public boolean isClusterConnection()
+   {
+      return clusterConnection;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(clusterConnection);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      clusterConnection = buffer.readBoolean();
+   }
+
+   @Override
+   public String toString()
+   {
+      return "SubscribeClusterTopologyUpdatesMessage [clusterConnection=" + clusterConnection +
+             ", toString()=" +
+             super.toString() +
+             "]";
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (clusterConnection ? 1231 : 1237);
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SubscribeClusterTopologyUpdatesMessage))
+         return false;
+      SubscribeClusterTopologyUpdatesMessage other = (SubscribeClusterTopologyUpdatesMessage)obj;
+      if (clusterConnection != other.clusterConnection)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
new file mode 100644
index 0000000..a129ce0
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage
+{
+
+   private int clientVersion;
+
+   public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion)
+   {
+      super(SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
+
+      this.clientVersion = clientVersion;
+   }
+
+   public SubscribeClusterTopologyUpdatesMessageV2()
+   {
+      super(SUBSCRIBE_TOPOLOGY_V2);
+   }
+
+   // Public --------------------------------------------------------
+
+
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      super.encodeRest(buffer);
+      buffer.writeInt(clientVersion);
+   }
+
+   /**
+    * @return the clientVersion
+    */
+   public int getClientVersion()
+   {
+      return clientVersion;
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      super.decodeRest(buffer);
+      clientVersion = buffer.readInt();
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + clientVersion;
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SubscribeClusterTopologyUpdatesMessageV2))
+         return false;
+      SubscribeClusterTopologyUpdatesMessageV2 other = (SubscribeClusterTopologyUpdatesMessageV2)obj;
+      if (clientVersion != other.clientVersion)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java
new file mode 100644
index 0000000..ea74b0e
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting;
+
+/**
+ * CloseListeners can be registered with a {@link org.apache.activemq6.spi.core.protocol.RemotingConnection} to get notified when the connection is closed.
+ * <p>
+ * {@link org.apache.activemq6.spi.core.protocol.RemotingConnection#addCloseListener(CloseListener)}
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface CloseListener
+{
+   /**
+    * called when the connection is closed
+    */
+   void connectionClosed();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java
new file mode 100644
index 0000000..7274f14
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting;
+
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ * A FailureListener notifies the user when a connection failure occurred.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public interface FailureListener
+{
+   /**
+    * Notifies that a connection has failed due to the specified exception.
+    *
+    * @param exception exception which has caused the connection to fail
+    * @param failedOver
+    */
+   void connectionFailed(HornetQException exception, boolean failedOver);
+
+   /**
+    * Notifies that a connection has failed due to the specified exception.
+    *
+    * @param exception exception which has caused the connection to fail
+    * @param failedOver
+    * @param scaleDownTargetNodeID the ID of the node to which messages are scaling down
+    */
+   void connectionFailed(HornetQException exception, boolean failedOver, String scaleDownTargetNodeID);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java
new file mode 100644
index 0000000..66e1e27
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.remoting.impl;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq6.api.core.TransportConfigurationHelper;
+import org.apache.activemq6.utils.ClassloadingUtil;
+
+/**
+ * Stores static mappings of class names to ConnectorFactory instances to act as a central repo for ConnectorFactory
+ * objects.
+ *
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+
+public class TransportConfigurationUtil
+{
+   private static final Map<String, Map<String, Object>> DEFAULTS = new HashMap<>();
+
+   private static final HashMap<String, Object> EMPTY_HELPER = new HashMap<>();
+
+   public static Map<String, Object> getDefaults(String className)
+   {
+      if (className == null)
+      {
+         /* Returns a new clone of the empty helper.  This allows any parent objects to update the map key/values
+            without polluting the EMPTY_HELPER map. */
+         return (Map<String, Object>) EMPTY_HELPER.clone();
+      }
+
+      if (!DEFAULTS.containsKey(className))
+      {
+         Object object = instantiateObject(className);
+         if (object != null && object instanceof TransportConfigurationHelper)
+         {
+
+            DEFAULTS.put(className, ((TransportConfigurationHelper) object).getDefaults());
+         }
+         else
+         {
+            DEFAULTS.put(className, EMPTY_HELPER);
+         }
+      }
+
+      /* We need to return a copy of the default Map.  This means the defaults parent is able to update the map without
+      modifying the original */
+      return cloneDefaults(DEFAULTS.get(className));
+   }
+
+   private static Object instantiateObject(final String className)
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            try
+            {
+               return ClassloadingUtil.newInstanceFromClassLoader(className);
+            }
+            catch (IllegalStateException e)
+            {
+               return null;
+            }
+         }
+      });
+   }
+
+   private static Map<String, Object> cloneDefaults(Map<String, Object> defaults)
+   {
+      Map<String, Object> cloned = new HashMap<String, Object>();
+      for (Map.Entry entry : defaults.entrySet())
+      {
+         cloned.put((String) entry.getKey(), entry.getValue());
+      }
+      return cloned;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
new file mode 100644
index 0000000..7890953
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
+ *
+ * @author <a href="tlee@redhat.com">Trustin Lee</a>
+ * @author <a href="nmaurer@redhat.com">Norman Maurer</a>
+ * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
+ */
+public class HornetQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder
+{
+   public HornetQAMQPFrameDecoder()
+   {
+      // The interface itself is part of the buffer (hence the -4)
+      super(Integer.MAX_VALUE, 0, 4, -4 , 0);
+   }
+
+
+   @Override
+   protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
+   {
+      return super.extractFrame(ctx, buffer, index, length);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java
new file mode 100644
index 0000000..5cbcb14
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.spi.core.remoting.BufferHandler;
+import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener;
+
+
+/**
+ * Common handler implementation for client and server side handler.
+ *
+ * @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
+ * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
+ * @version $Rev$, $Date$
+ */
+public class HornetQChannelHandler extends ChannelDuplexHandler
+{
+   private final ChannelGroup group;
+
+   private final BufferHandler handler;
+
+   private final ConnectionLifeCycleListener listener;
+
+   volatile boolean active;
+
+   protected HornetQChannelHandler(final ChannelGroup group,
+                                   final BufferHandler handler,
+                                   final ConnectionLifeCycleListener listener)
+   {
+      this.group = group;
+      this.handler = handler;
+      this.listener = listener;
+   }
+
+   @Override
+   public void channelActive(final ChannelHandlerContext ctx) throws Exception
+   {
+      group.add(ctx.channel());
+      ctx.fireChannelActive();
+   }
+
+   @Override
+   public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
+   {
+      // TODO: Think about the id thingy
+      listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable());
+   }
+
+   @Override
+   public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+   {
+      ByteBuf buffer = (ByteBuf) msg;
+
+      handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+   }
+
+   @Override
+   public void channelInactive(final ChannelHandlerContext ctx) throws Exception
+   {
+      synchronized (this)
+      {
+         if (active)
+         {
+            listener.connectionDestroyed(channelId(ctx.channel()));
+
+            active = false;
+         }
+      }
+   }
+
+   @Override
+   public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception
+   {
+      if (!active)
+      {
+         return;
+      }
+      // We don't want to log this - since it is normal for this to happen during failover/reconnect
+      // and we don't want to spew out stack traces in that event
+      // The user has access to this exeception anyway via the HornetQException initial cause
+
+      HornetQException me = HornetQClientMessageBundle.BUNDLE.nettyError();
+      me.initCause(cause);
+
+      synchronized (listener)
+      {
+         try
+         {
+            listener.connectionException(channelId(ctx.channel()), me);
+            active = false;
+         }
+         catch (Exception ex)
+         {
+            HornetQClientLogger.LOGGER.errorCallingLifeCycleListener(ex);
+         }
+      }
+   }
+
+   protected static int channelId(Channel channel)
+   {
+      return channel.hashCode();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java
new file mode 100644
index 0000000..31f9c80
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
+ *
+ * @author <a href="tlee@redhat.com">Trustin Lee</a>
+ * @author <a href="nmaurer@redhat.com">Norman Maurer</a>
+ * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
+ */
+public class HornetQFrameDecoder2 extends LengthFieldBasedFrameDecoder
+{
+   public HornetQFrameDecoder2()
+   {
+      super(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT);
+   }
+
+   @Override
+   protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
+   {
+      return super.extractFrame(ctx, buffer, index, length).skipBytes(DataConstants.SIZE_INT);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java
new file mode 100644
index 0000000..015a864
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java
@@ -0,0 +1,448 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.security.HornetQPrincipal;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq6.spi.core.remoting.ReadyListener;
+import org.apache.activemq6.utils.ConcurrentHashSet;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="nmaurer@redhat.com">Norman Maurer</a>
+ */
+public class NettyConnection implements Connection
+{
+   // Constants -----------------------------------------------------
+   private static final int BATCHING_BUFFER_SIZE = 8192;
+
+   // Attributes ----------------------------------------------------
+
+   protected final Channel channel;
+
+   private boolean closed;
+
+   private final ConnectionLifeCycleListener listener;
+
+   private final boolean batchingEnabled;
+
+   private final boolean directDeliver;
+
+   private volatile HornetQBuffer batchBuffer;
+
+   private final Map<String, Object> configuration;
+
+   private final Semaphore writeLock = new Semaphore(1);
+
+   private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
+
+   private RemotingConnection protocolConnection;
+
+// Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public NettyConnection(final Map<String, Object> configuration,
+                          final Channel channel,
+                          final ConnectionLifeCycleListener listener,
+                          boolean batchingEnabled,
+                          boolean directDeliver)
+   {
+      this.configuration = configuration;
+
+      this.channel = channel;
+
+      this.listener = listener;
+
+      this.batchingEnabled = batchingEnabled;
+
+      this.directDeliver = directDeliver;
+   }
+
+   // Public --------------------------------------------------------
+
+   public Channel getNettyChannel()
+   {
+      return channel;
+   }
+   // Connection implementation ----------------------------
+
+
+   public void forceClose()
+   {
+      if (channel != null)
+      {
+         try
+         {
+            channel.close();
+         }
+         catch (Throwable e)
+         {
+            HornetQClientLogger.LOGGER.warn(e.getMessage(), e);
+         }
+      }
+   }
+
+
+   /**
+    * This is exposed so users would have the option to look at any data through interceptors
+    *
+    * @return
+    */
+   public Channel getChannel()
+   {
+      return channel;
+   }
+
+   public RemotingConnection getProtocolConnection()
+   {
+      return protocolConnection;
+   }
+
+   public void setProtocolConnection(RemotingConnection protocolConnection)
+   {
+      this.protocolConnection = protocolConnection;
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
+      EventLoop eventLoop = channel.eventLoop();
+      boolean inEventLoop = eventLoop.inEventLoop();
+      //if we are in an event loop we need to close the channel after the writes have finished
+      if (!inEventLoop)
+      {
+         closeSSLAndChannel(sslHandler, channel);
+      }
+      else
+      {
+         eventLoop.execute(new Runnable()
+         {
+            @Override
+            public void run()
+            {
+               closeSSLAndChannel(sslHandler, channel);
+            }
+         });
+      }
+
+      closed = true;
+
+      listener.connectionDestroyed(getID());
+   }
+
+   public HornetQBuffer createBuffer(final int size)
+   {
+      return new ChannelBufferWrapper(channel.alloc().buffer(size));
+   }
+
+   public Object getID()
+   {
+      // TODO: Think of it
+      return channel.hashCode();
+   }
+
+   // This is called periodically to flush the batch buffer
+   public void checkFlushBatchBuffer()
+   {
+      if (!batchingEnabled)
+      {
+         return;
+      }
+
+      if (writeLock.tryAcquire())
+      {
+         try
+         {
+            if (batchBuffer != null && batchBuffer.readable())
+            {
+               channel.writeAndFlush(batchBuffer.byteBuf());
+
+               batchBuffer = createBuffer(BATCHING_BUFFER_SIZE);
+            }
+         }
+         finally
+         {
+            writeLock.release();
+         }
+      }
+   }
+
+   public void write(final HornetQBuffer buffer)
+   {
+      write(buffer, false, false);
+   }
+
+   public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
+   {
+      write(buffer, flush, batched, null);
+   }
+
+   public void write(HornetQBuffer buffer, final boolean flush, final boolean batched, final ChannelFutureListener futureListener)
+   {
+
+      try
+      {
+         writeLock.acquire();
+
+         try
+         {
+            if (batchBuffer == null && batchingEnabled && batched && !flush)
+            {
+               // Lazily create batch buffer
+
+               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+            }
+
+            if (batchBuffer != null)
+            {
+               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
+               {
+                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
+
+                  buffer = batchBuffer;
+               }
+               else
+               {
+                  return;
+               }
+
+               if (!batched || flush)
+               {
+                  batchBuffer = null;
+               }
+               else
+               {
+                  // Create a new buffer
+
+                  batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+               }
+            }
+
+            // depending on if we need to flush or not we can use a voidPromise or
+            // use a normal promise
+            final ByteBuf buf = buffer.byteBuf();
+            final ChannelPromise promise;
+            if (flush || futureListener != null)
+            {
+               promise = channel.newPromise();
+            }
+            else
+            {
+               promise = channel.voidPromise();
+            }
+
+            EventLoop eventLoop = channel.eventLoop();
+            boolean inEventLoop = eventLoop.inEventLoop();
+            if (!inEventLoop)
+            {
+               if (futureListener != null)
+               {
+                  channel.writeAndFlush(buf, promise).addListener(futureListener);
+               }
+               else
+               {
+                  channel.writeAndFlush(buf, promise);
+               }
+            }
+            else
+            {
+               // create a task which will be picked up by the eventloop and trigger the write.
+               // This is mainly needed as this method is triggered by different threads for the same channel.
+               // if we not do this we may produce out of order writes.
+               final Runnable task = new Runnable()
+               {
+                  @Override
+                  public void run()
+                  {
+                     if (futureListener != null)
+                     {
+                        channel.writeAndFlush(buf, promise).addListener(futureListener);
+                     }
+                     else
+                     {
+                        channel.writeAndFlush(buf, promise);
+                     }
+                  }
+               };
+               // execute the task on the eventloop
+               eventLoop.execute(task);
+            }
+
+
+            // only try to wait if not in the eventloop otherwise we will produce a deadlock
+            if (flush && !inEventLoop)
+            {
+               while (true)
+               {
+                  try
+                  {
+                     boolean ok = promise.await(10000);
+
+                     if (!ok)
+                     {
+                        HornetQClientLogger.LOGGER.timeoutFlushingPacket();
+                     }
+
+                     break;
+                  }
+                  catch (InterruptedException e)
+                  {
+                     throw new HornetQInterruptedException(e);
+                  }
+               }
+            }
+         }
+         finally
+         {
+            writeLock.release();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         throw new HornetQInterruptedException(e);
+      }
+   }
+
+   public String getRemoteAddress()
+   {
+      SocketAddress address = channel.remoteAddress();
+      if (address == null)
+      {
+         return null;
+      }
+      return address.toString();
+   }
+
+   public boolean isDirectDeliver()
+   {
+      return directDeliver;
+   }
+
+   public void addReadyListener(final ReadyListener listener)
+   {
+      readyListeners.add(listener);
+   }
+
+   public void removeReadyListener(final ReadyListener listener)
+   {
+      readyListeners.remove(listener);
+   }
+
+   //never allow this
+   public HornetQPrincipal getDefaultHornetQPrincipal()
+   {
+      return null;
+   }
+
+   void fireReady(final boolean ready)
+   {
+      for (ReadyListener listener : readyListeners)
+      {
+         listener.readyForWriting(ready);
+      }
+   }
+
+
+   @Override
+   public TransportConfiguration getConnectorConfig()
+   {
+      if (configuration != null)
+      {
+         return new TransportConfiguration(NettyConnectorFactory.class.getName(), this.configuration);
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public boolean isUsingProtocolHandling()
+   {
+      return true;
+   }
+
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+
+   private void closeSSLAndChannel(SslHandler sslHandler, Channel channel)
+   {
+      if (sslHandler != null)
+      {
+         try
+         {
+            ChannelFuture sslCloseFuture = sslHandler.close();
+
+            if (!sslCloseFuture.awaitUninterruptibly(10000))
+            {
+               HornetQClientLogger.LOGGER.timeoutClosingSSL();
+            }
+         }
+         catch (Throwable t)
+         {
+            // ignore
+         }
+      }
+
+      ChannelFuture closeFuture = channel.close();
+      if (!closeFuture.awaitUninterruptibly(10000))
+      {
+         HornetQClientLogger.LOGGER.timeoutClosingNettyChannel();
+      }
+   }
+   // Inner classes -------------------------------------------------
+
+}


Mime
View raw message