activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [49/68] [abbrv] activemq-artemis git commit: fixing SlowConsumerDetection
Date Mon, 21 Mar 2016 22:56:05 GMT
fixing SlowConsumerDetection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6fc5d6b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6fc5d6b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6fc5d6b4

Branch: refs/heads/refactor-openwire
Commit: 6fc5d6b421b98cf16ab85e64614b260698b5216a
Parents: 6c650e1
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Feb 25 14:40:04 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/OpenWireConnection.java | 12 +++++++++---
 .../protocol/openwire/OpenWireProtocolManager.java | 17 ++++-------------
 .../core/server/impl/ServerConsumerImpl.java       | 17 +++++++++++++++++
 3 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fc5d6b4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6f2e3be..dc2a8a6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -204,6 +204,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
+
+
+         // TODO-NOW: the server should send packets to the client based on the requested
times
+         //           need to look at what Andy did on AMQP
+
          // the connection handles pings, negotiations directly.
          // and delegate all other commands to manager.
          if (command.getClass() == KeepAliveInfo.class) {
@@ -1196,12 +1201,12 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
 
       @Override
       public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
       public Response processMessageDispatchNotification(MessageDispatchNotification arg0)
throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
@@ -1222,7 +1227,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
       @Override
       public Response processProducerAck(ProducerAck arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         // a broker doesn't do producers.. this shouldn't happen
+         return null;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fc5d6b4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bdf27f8..514a2b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,14 +17,12 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -44,7 +42,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -60,7 +57,6 @@ import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
@@ -91,21 +87,14 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
 
    private OpenWireFormatFactory wireFactory;
 
-   private boolean tightEncodingEnabled = true;
-
    private boolean prefixPacketSize = true;
 
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
 
-   // from broker
-   protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new
HashMap<ConnectionId, OpenWireConnection>());
-
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
-   protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new
ConcurrentHashMap<>();
-
-   // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available
on Artemis upstream (unique-client-id)
+   // TODO-NOW: this can probably go away
    private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String,
AMQConnectionContext>();
 
    private String brokerName;
@@ -133,11 +122,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       // preferred prop, should be done via config
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
-      ManagementService service = server.getManagementService();
       scheduledPool = server.getScheduledPool();
 
       final ClusterManager clusterManager = this.server.getClusterManager();
+
+      // TODO-NOW: use a property name for the cluster connection
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
+
       if (cc != null) {
          cc.addClusterTopologyListener(this);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fc5d6b4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 545b4dc..b5ea5d9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
    private Object protocolContext;
 
+   private SlowConsumerDetectionListener slowConsumerListener;
+
    /**
     * We get a readLock when a message is handled, and return the readLock when the message
is finally delivered
     * When stopping the consumer we need to get a writeLock to make sure we had all delivery
finished
@@ -223,6 +225,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    // ----------------------------------------------------------------------
 
    @Override
+   public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
+      this.slowConsumerListener = listener;
+   }
+
+   @Override
+   public SlowConsumerDetectionListener getSlowConsumerDetecion() {
+      return slowConsumerListener;
+   }
+
+   @Override
+   public void fireSlowConsumer() {
+      slowConsumerListener.onSlowConsumer(this);
+   }
+
+   @Override
    public Object getProtocolContext() {
       return protocolContext;
    }


Mime
View raw message