activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [42/43] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Tue, 09 Feb 2016 20:21:01 GMT
open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

More or less what I meant by getting rid of PeerBroker.. we can just use the Topology we already have
(cherry picked from commit 5559f05cc57493ad5f54f214c0b6bc3d92820efb)

put back rebalance and test
(cherry picked from commit 9711d6cf26ddaad5b08c0024e27819e74fef02af from branch new2)

fix some configure issues for tests. so far only one of the two bridges seen started
(cherry picked from commit ec1bfdb65bf314c05f12134e96a5e7d1a854b398)

Reverting configuration stuff, adding new cluster base class
(cherry picked from 8267e8196c969fef8dc6248ed7791b31e6e01a03)

fixing cluster test
(cherry picked from commit ab29219c0f89909bd1687e00353a19e4385eb06e)

getting the first test working
(cherry picked from commit 0e724d1cb462345969f96a0e706799628a2c56dd)

fixing clustering tests
(cherry picked from 2a20696cdec44963ba1320b438d84409a1709809)

Starting with nother test FailoverComplexClusterTest

  I added a NewFailoverComplexClusterTest copied from the
  original test. So I can keep the original for reference.
  Once test passed I'll overwrite the original with the
  new one.
(cherry picked from f9e959d91d99676916a3e218738809c7f0c62b99)

Test fixes
(cherry picked from commit 567f57848ed10f54512b25118c54372e6510afdc)

more tests
(cherry picked from 67633c61c64f2bd0ffab4c5bb608922387427ad8)

Finished FailoverComplexClusterTest, now its tests all pass.
(cherry picked from 8ee4bb845fffd6499665801b73ac2e443dda8de5)

Openwire test work update:
  changed some code to pass protocol specific params to accetpors
  FailoverRandomTest ok
  delete a test that is not relevant
  FailoverPriorityTest (still not working)

(cherry picked from commit 3b9eae2d52dc73f58e4f5106c912cddda95388c6)

FailoverPriorityTest is ok now.
(cherry picked from commit a61abdc3349ad846f53b5703c6ff03a68304ac6f)

    - Fix ReconnectTest
    - Update activemq-client to 5.12.0 for activemq5-unit-tests
    - Remove some tests that are removed from 5.12.0
    - Add a synchronization on JMSServerManagerImpl
    internalCreateQueue to avoid NPE in FailoverClusterTest

    (cherry picked from commit 5d44bcf8ce02f3a0936ee12ecf86a3948e2b3a0c)

Fix connection closing issue which causes
tests teardown takes very long time.
(cherry picked from 50bef42d32d2fae705353b96e2b626d4fc722cc6)

InitalReconnectDelayTest.java
SlowConnectionTest.java
TwoBrokerFailoverClusterTest.java
(cherry picked from a5ebfae1a2af938ad10b1dee2d2c9bf11f8d0116)

Fix a few more tests
(cherry picked from 83a316c37f1a1ba7b36a35261a0fee031d606e96)

- Fix tests:
FanoutTransportBrokerTest, FanoutTest
- Disable server auto-creation for cluster tests
(cherry picked from commit c6d7c7b28b1754b1d1898c038f1c9a0952028cbd)

more tests
(cherry picked from commit f4217734492ffe9be1e8380959f13cfb85ab9e1e)

Fix SoWriteTimeoutClientTest
Add a check in OpenwireConnection.disconnect() to prevent it from reentering.

(cherry picked from commit c74ef17f4d038d772c0d8457194bb83282e87211)

Remove command check in OpenWireConnection
Handle ShutdownInfo command properly
ConnectionCleanupTest (a new test from 12.0)

(cherry picked from commit a2512ff083ab691b7cb0abc27711d6ed78739d66)

Clean up all tests under org.apache.activemq.transport.failover package

(Cherry picked from 181c874fa80ae103df3b8619c97141e49aafc847)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6a3a994e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6a3a994e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6a3a994e

Branch: refs/heads/refactor-openwire
Commit: 6a3a994e9be8473fe47b997598c63f64163beba5
Parents: 7d5651e
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jan 13 22:53:59 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Feb 9 15:19:54 2016 -0500

----------------------------------------------------------------------
 .../activemq/artemis/utils/uri/URISchema.java   |    6 +-
 .../api/core/TransportConfiguration.java        |   94 +-
 .../TCPTransportConfigurationSchema.java        |   14 +-
 .../jms/server/embedded/EmbeddedJMS.java        |   21 +-
 .../jms/server/impl/JMSServerManagerImpl.java   |   34 +-
 .../protocol/openwire/OpenWireConnection.java   |  294 ++--
 .../openwire/OpenWireProtocolManager.java       |  197 ++-
 .../openwire/amq/AMQConnectionContext.java      |   19 +
 .../core/protocol/openwire/amq/AMQConsumer.java |   16 +-
 .../protocol/openwire/amq/AMQServerSession.java |    8 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   24 +-
 .../artemis/core/config/Configuration.java      |    4 +
 .../core/config/impl/ConfigurationImpl.java     |   19 +
 .../core/remoting/impl/netty/NettyAcceptor.java |    2 +
 .../server/impl/RemotingServiceImpl.java        |    2 +-
 .../core/server/embedded/EmbeddedActiveMQ.java  |    5 +
 .../uri/ClusterConnectionConfigurationTest.java |    1 -
 tests/activemq5-unit-tests/pom.xml              |   39 +-
 .../broker/ArtemisBrokerWrapperFactory.java     |   32 +
 .../apache/activemq/broker/BrokerService.java   |  133 +-
 .../artemiswrapper/ArtemisBrokerBase.java       |   17 +-
 .../artemiswrapper/ArtemisBrokerWrapper.java    |   24 +-
 .../artemiswrapper/OpenwireArtemisBaseTest.java |  266 ++++
 .../transport/tcp/TcpTransportFactory.java      |   12 +-
 .../activemq/ActiveMQInputStreamTest.java       |  148 --
 .../apache/activemq/AutoFailTestSupport.java    |  159 ++
 .../apache/activemq/ConnectionCleanupTest.java  |   34 +-
 .../activemq/EmbeddedBrokerTestSupport.java     |   96 +-
 .../activemq/JmsQueueTransactionTest.java       |  234 +++
 .../activemq/JmsTransactionTestSupport.java     |  721 +++++++++
 .../org/apache/activemq/LargeStreamletTest.java |  170 --
 .../org/apache/activemq/broker/AMQ4351Test.java |  271 ----
 .../jmx/BrokerViewSlowStoreStartupTest.java     |  395 -----
 .../broker/jmx/HealthViewMBeanTest.java         |  119 --
 .../activemq/broker/jmx/Log4JConfigTest.java    |  194 ---
 .../broker/jmx/MBeanOperationTimeoutTest.java   |  136 --
 .../apache/activemq/broker/jmx/MBeanTest.java   | 1505 ------------------
 .../apache/activemq/broker/jmx/PurgeTest.java   |  258 ---
 .../broker/jmx/TransportConnectorMBeanTest.java |  141 --
 .../region/QueueDuplicatesFromStoreTest.java    |   87 +-
 .../region/SubscriptionAddRemoveQueueTest.java  |   21 +-
 .../region/cursors/NegativeQueueTest.java       |  432 -----
 .../broker/virtual/CompositeQueueTest.java      |  134 --
 .../broker/virtual/CompositeTopicTest.java      |   49 -
 .../DestinationInterceptorDurableSubTest.java   |  283 ----
 .../broker/virtual/FilteredQueueTest.java       |   36 -
 .../MirroredQueueCorrectMemoryUsageTest.java    |  167 --
 .../broker/virtual/MirroredQueueTest.java       |  116 --
 ...MirroredQueueUsingVirtualTopicQueueTest.java |   34 -
 .../broker/virtual/VirtualDestPerfTest.java     |  200 ---
 .../broker/virtual/VirtualTopicDLQTest.java     |  433 -----
 .../VirtualTopicDisconnectSelectorTest.java     |  188 ---
 .../broker/virtual/VirtualTopicPubSubTest.java  |  131 --
 .../VirtualTopicPubSubUsingXBeanTest.java       |   55 -
 .../virtual/VirtualTopicSelectorTest.java       |  108 --
 .../VirtualTopicsAndDurableSubsTest.java        |  117 --
 .../activemq/broker/virtual/composite-queue.xml |   47 -
 .../activemq/broker/virtual/composite-topic.xml |   47 -
 .../broker/virtual/disconnected-selector.xml    |   43 -
 .../activemq/broker/virtual/filtered-queue.xml  |   47 -
 .../broker/virtual/global-virtual-topics.xml    |   42 -
 .../broker/virtual/virtual-individual-dlq.xml   |   80 -
 .../virtual/virtual-topics-and-interceptor.xml  |   50 -
 .../java/org/apache/activemq/bugs/AMQ1282.java  |  206 ---
 .../org/apache/activemq/bugs/AMQ1687Test.java   |  106 --
 .../org/apache/activemq/bugs/AMQ1853Test.java   |  378 -----
 .../java/org/apache/activemq/bugs/AMQ1866.java  |  233 ---
 .../org/apache/activemq/bugs/AMQ1893Test.java   |  192 ---
 .../org/apache/activemq/bugs/AMQ1917Test.java   |  229 ---
 .../org/apache/activemq/bugs/AMQ1936Test.java   |  320 ----
 .../org/apache/activemq/bugs/AMQ2021Test.java   |  275 ----
 .../org/apache/activemq/bugs/AMQ2084Test.java   |  188 ---
 .../org/apache/activemq/bugs/AMQ2103Test.java   |  130 --
 .../activemq/bugs/AMQ2149LevelDBTest.java       |   30 -
 .../org/apache/activemq/bugs/AMQ2149Test.java   |  614 -------
 .../org/apache/activemq/bugs/AMQ2171Test.java   |  150 --
 .../org/apache/activemq/bugs/AMQ2200Test.java   |  100 --
 .../org/apache/activemq/bugs/AMQ2213Test.java   |  101 --
 .../org/apache/activemq/bugs/AMQ2314Test.java   |  181 ---
 .../org/apache/activemq/bugs/AMQ2356Test.java   |  192 ---
 .../org/apache/activemq/bugs/AMQ2364Test.java   |  113 --
 .../org/apache/activemq/bugs/AMQ2383Test.java   |   61 -
 .../org/apache/activemq/bugs/AMQ2401Test.java   |  235 ---
 .../org/apache/activemq/bugs/AMQ2413Test.java   |  344 ----
 .../org/apache/activemq/bugs/AMQ2439Test.java   |   94 --
 .../org/apache/activemq/bugs/AMQ2489Test.java   |  232 ---
 .../org/apache/activemq/bugs/AMQ2512Test.java   |  179 ---
 .../org/apache/activemq/bugs/AMQ2513Test.java   |  105 --
 .../org/apache/activemq/bugs/AMQ2528Test.java   |   79 -
 .../org/apache/activemq/bugs/AMQ2571Test.java   |  115 --
 .../org/apache/activemq/bugs/AMQ2580Test.java   |  195 ---
 .../activemq/bugs/AMQ2584ConcurrentDlqTest.java |  268 ----
 .../org/apache/activemq/bugs/AMQ2584Test.java   |  233 ---
 .../org/apache/activemq/bugs/AMQ2585Test.java   |   82 -
 .../org/apache/activemq/bugs/AMQ2616Test.java   |  118 --
 .../org/apache/activemq/bugs/AMQ2645Test.java   |  112 --
 .../org/apache/activemq/bugs/AMQ2736Test.java   |   98 --
 .../org/apache/activemq/bugs/AMQ2751Test.java   |   97 --
 .../org/apache/activemq/bugs/AMQ2801Test.java   |  199 ---
 .../org/apache/activemq/bugs/AMQ2832Test.java   |  379 -----
 .../org/apache/activemq/bugs/AMQ2870Test.java   |  227 ---
 .../org/apache/activemq/bugs/AMQ2902Test.java   |   96 --
 .../org/apache/activemq/bugs/AMQ2910Test.java   |  130 --
 .../org/apache/activemq/bugs/AMQ2982Test.java   |  184 ---
 .../org/apache/activemq/bugs/AMQ2983Test.java   |  165 --
 .../org/apache/activemq/bugs/AMQ3014Test.java   |  200 ---
 .../org/apache/activemq/bugs/AMQ3120Test.java   |  147 --
 .../org/apache/activemq/bugs/AMQ3140Test.java   |  146 --
 .../org/apache/activemq/bugs/AMQ3141Test.java   |  117 --
 .../org/apache/activemq/bugs/AMQ3145Test.java   |  129 --
 .../org/apache/activemq/bugs/AMQ3157Test.java   |  174 --
 .../org/apache/activemq/bugs/AMQ3167Test.java   |  471 ------
 .../org/apache/activemq/bugs/AMQ3274Test.java   |  763 ---------
 .../org/apache/activemq/bugs/AMQ3324Test.java   |  148 --
 .../org/apache/activemq/bugs/AMQ3352Test.java   |   74 -
 .../org/apache/activemq/bugs/AMQ3405Test.java   |  281 ----
 .../org/apache/activemq/bugs/AMQ3436Test.java   |  203 ---
 .../org/apache/activemq/bugs/AMQ3445Test.java   |  148 --
 .../org/apache/activemq/bugs/AMQ3454Test.java   |   75 -
 .../org/apache/activemq/bugs/AMQ3465Test.java   |  198 ---
 .../org/apache/activemq/bugs/AMQ3529Test.java   |  185 ---
 .../org/apache/activemq/bugs/AMQ3537Test.java   |  105 --
 .../org/apache/activemq/bugs/AMQ3567Test.java   |  212 ---
 .../org/apache/activemq/bugs/AMQ3622Test.java   |  109 --
 .../org/apache/activemq/bugs/AMQ3625Test.java   |  110 --
 .../org/apache/activemq/bugs/AMQ3674Test.java   |  122 --
 .../org/apache/activemq/bugs/AMQ3675Test.java   |  162 --
 .../org/apache/activemq/bugs/AMQ3678Test.java   |  216 ---
 .../org/apache/activemq/bugs/AMQ3732Test.java   |  178 ---
 .../org/apache/activemq/bugs/AMQ3779Test.java   |   77 -
 .../org/apache/activemq/bugs/AMQ3841Test.java   |  119 --
 .../org/apache/activemq/bugs/AMQ3879Test.java   |  113 --
 .../org/apache/activemq/bugs/AMQ3903Test.java   |  144 --
 .../org/apache/activemq/bugs/AMQ3932Test.java   |  164 --
 .../org/apache/activemq/bugs/AMQ3934Test.java   |  106 --
 .../org/apache/activemq/bugs/AMQ3961Test.java   |  185 ---
 .../org/apache/activemq/bugs/AMQ3992Test.java   |  106 --
 .../org/apache/activemq/bugs/AMQ4062Test.java   |  280 ----
 .../org/apache/activemq/bugs/AMQ4083Test.java   |  520 ------
 .../org/apache/activemq/bugs/AMQ4092Test.java   |  234 ---
 .../org/apache/activemq/bugs/AMQ4116Test.java   |  111 --
 .../org/apache/activemq/bugs/AMQ4126Test.java   |  181 ---
 .../org/apache/activemq/bugs/AMQ4133Test.java   |  107 --
 .../org/apache/activemq/bugs/AMQ4147Test.java   |  210 ---
 .../org/apache/activemq/bugs/AMQ4148Test.java   |   93 --
 .../org/apache/activemq/bugs/AMQ4157Test.java   |  178 ---
 .../org/apache/activemq/bugs/AMQ4160Test.java   |  380 -----
 .../org/apache/activemq/bugs/AMQ4212Test.java   |  357 -----
 .../org/apache/activemq/bugs/AMQ4213Test.java   |   88 -
 .../org/apache/activemq/bugs/AMQ4220Test.java   |  119 --
 .../org/apache/activemq/bugs/AMQ4221Test.java   |  274 ----
 .../org/apache/activemq/bugs/AMQ4222Test.java   |  187 ---
 .../org/apache/activemq/bugs/AMQ4323Test.java   |  160 --
 .../org/apache/activemq/bugs/AMQ4356Test.java   |  142 --
 .../org/apache/activemq/bugs/AMQ4361Test.java   |  160 --
 .../org/apache/activemq/bugs/AMQ4368Test.java   |  256 ---
 .../org/apache/activemq/bugs/AMQ4407Test.java   |  174 --
 .../org/apache/activemq/bugs/AMQ4413Test.java   |  246 ---
 .../org/apache/activemq/bugs/AMQ4469Test.java   |  113 --
 .../org/apache/activemq/bugs/AMQ4472Test.java   |   96 --
 .../org/apache/activemq/bugs/AMQ4475Test.java   |  361 -----
 .../bugs/AMQ4485LowLimitLevelDBTest.java        |   40 -
 .../activemq/bugs/AMQ4485LowLimitTest.java      |  473 ------
 ...XBrokersWithNDestsFanoutTransactionTest.java |  358 -----
 .../org/apache/activemq/bugs/AMQ4485Test.java   |  199 ---
 .../org/apache/activemq/bugs/AMQ4487Test.java   |  135 --
 .../org/apache/activemq/bugs/AMQ4504Test.java   |   83 -
 .../org/apache/activemq/bugs/AMQ4513Test.java   |  145 --
 .../org/apache/activemq/bugs/AMQ4517Test.java   |  129 --
 .../org/apache/activemq/bugs/AMQ4518Test.java   |  129 --
 .../org/apache/activemq/bugs/AMQ4530Test.java   |  115 --
 .../org/apache/activemq/bugs/AMQ4531Test.java   |  146 --
 .../org/apache/activemq/bugs/AMQ4554Test.java   |  107 --
 .../org/apache/activemq/bugs/AMQ4582Test.java   |   95 --
 .../org/apache/activemq/bugs/AMQ4595Test.java   |  158 --
 .../org/apache/activemq/bugs/AMQ4607Test.java   |  263 ---
 .../org/apache/activemq/bugs/AMQ4636Test.java   |  263 ---
 .../org/apache/activemq/bugs/AMQ4656Test.java   |  153 --
 .../org/apache/activemq/bugs/AMQ4671Test.java   |   83 -
 .../org/apache/activemq/bugs/AMQ4677Test.java   |  182 ---
 .../org/apache/activemq/bugs/AMQ4853Test.java   |  304 ----
 .../org/apache/activemq/bugs/AMQ4887Test.java   |  168 --
 .../org/apache/activemq/bugs/AMQ4893Test.java   |   86 -
 .../org/apache/activemq/bugs/AMQ4899Test.java   |  197 ---
 .../org/apache/activemq/bugs/AMQ4930Test.java   |  147 --
 .../org/apache/activemq/bugs/AMQ4950Test.java   |  197 ---
 .../org/apache/activemq/bugs/AMQ4952Test.java   |  513 ------
 .../org/apache/activemq/bugs/AMQ5035Test.java   |   83 -
 .../org/apache/activemq/bugs/AMQ5136Test.java   |   98 --
 .../org/apache/activemq/bugs/AMQ5212Test.java   |  225 ---
 .../activemq/bugs/AMQ5266SingleDestTest.java    |  617 -------
 .../bugs/AMQ5266StarvedConsumerTest.java        |  628 --------
 .../org/apache/activemq/bugs/AMQ5266Test.java   |  604 -------
 .../org/apache/activemq/bugs/AMQ5274Test.java   |  133 --
 .../org/apache/activemq/bugs/AMQ5381Test.java   |  178 ---
 .../org/apache/activemq/bugs/AMQ5421Test.java   |  119 --
 .../org/apache/activemq/bugs/AMQ5450Test.java   |  196 ---
 .../org/apache/activemq/bugs/AMQ5567Test.java   |  217 ---
 .../bugs/ActiveMQSlowConsumerManualTest.java    |  250 ---
 .../activemq/bugs/ConnectionPerMessageTest.java |  108 --
 .../org/apache/activemq/bugs/CraigsBugTest.java |   72 -
 .../apache/activemq/bugs/DoubleExpireTest.java  |  134 --
 .../activemq/bugs/DurableConsumerTest.java      |  479 ------
 .../bugs/JMSDurableTopicNoLocalTest.java        |   85 -
 .../bugs/JmsDurableTopicSlowReceiveTest.java    |  185 ---
 .../apache/activemq/bugs/JmsTimeoutTest.java    |  166 --
 .../bugs/MemoryUsageBlockResumeTest.java        |  221 ---
 .../activemq/bugs/MemoryUsageBrokerTest.java    |   93 --
 .../activemq/bugs/MemoryUsageCleanupTest.java   |  258 ---
 .../bugs/MessageExpirationReaperTest.java       |  185 ---
 .../org/apache/activemq/bugs/MessageSender.java |   49 -
 .../activemq/bugs/MissingDataFileTest.java      |  333 ----
 .../OptimizeAcknowledgeWithExpiredMsgsTest.java |  309 ----
 .../activemq/bugs/OutOfOrderTestCase.java       |  133 --
 .../activemq/bugs/QueueWorkerPrefetchTest.java  |  267 ----
 .../bugs/RawRollbackSharedConsumerTests.java    |  134 --
 .../apache/activemq/bugs/RawRollbackTests.java  |  135 --
 .../java/org/apache/activemq/bugs/Receiver.java |   22 -
 .../bugs/RedeliveryPluginHeaderTest.java        |  166 --
 .../apache/activemq/bugs/SlowConsumerTest.java  |  165 --
 ...ReplayAfterStoreCleanupLevelDBStoreTest.java |   30 -
 .../bugs/TempQueueDeleteOnCloseTest.java        |   54 -
 .../bugs/TempStorageBlockedBrokerTest.java      |  266 ----
 .../bugs/TempStorageConfigBrokerTest.java       |  220 ---
 .../activemq/bugs/TempStoreDataCleanupTest.java |  262 ---
 .../TransactedStoreUsageSuspendResumeTest.java  |  196 ---
 .../bugs/TransactionNotStartedErrorTest.java    |  298 ----
 .../bugs/TrapMessageInJDBCStoreTest.java        |  277 ----
 .../activemq/bugs/VMTransportClosureTest.java   |  135 --
 .../activemq/bugs/VerifySteadyEnqueueRate.java  |  148 --
 .../activemq/bugs/amq1095/ActiveMQTestCase.java |  158 --
 .../bugs/amq1095/MessageSelectorTest.java       |  218 ---
 .../apache/activemq/bugs/amq1095/activemq.xml   |   39 -
 .../activemq/bugs/amq1974/TryJmsClient.java     |  155 --
 .../activemq/bugs/amq1974/TryJmsManager.java    |  125 --
 .../bugs/amq3625/conf/JaasStompSSLBroker1.xml   |   65 -
 .../bugs/amq3625/conf/JaasStompSSLBroker2.xml   |   39 -
 .../bugs/amq3625/conf/groups2.properties        |   20 -
 .../activemq/bugs/amq3625/conf/login.config     |   22 -
 .../bugs/amq3625/conf/users2.properties         |   23 -
 .../activemq/bugs/amq3625/keys/broker2.ks       |    0
 .../activemq/bugs/amq3625/keys/client2.ks       |    0
 .../activemq/bugs/amq3625/keys/client2.ts       |    0
 ...InconsistentConnectorPropertiesBehaviour.xml |   46 -
 .../bugs/amq4126/JaasStompSSLBroker.xml         |   46 -
 .../apache/activemq/bugs/amq4126/dns.properties |   17 -
 .../activemq/bugs/amq4126/groups.properties     |   18 -
 .../apache/activemq/bugs/amq4126/login.config   |   30 -
 .../activemq/bugs/amq4126/users.properties      |   18 -
 .../apache/activemq/bugs/amq5035/activemq.xml   |  109 --
 .../bugs/embedded/EmbeddedActiveMQ.java         |  104 --
 .../activemq/bugs/embedded/ThreadExplorer.java  |  148 --
 .../network/CompressionOverNetworkTest.java     |    4 +-
 .../activemq/network/NetworkLoopBackTest.java   |    5 +-
 .../activemq/network/SimpleNetworkTest.java     |    6 +-
 .../store/AutoStorePerDestinationTest.java      |   44 -
 .../store/LevelDBStorePerDestinationTest.java   |   46 -
 .../activemq/store/MessagePriorityTest.java     |  584 -------
 .../apache/activemq/store/StoreOrderTest.java   |  274 ----
 .../activemq/store/StorePerDestinationTest.java |  314 ----
 .../store/jdbc/BrokenPersistenceAdapter.java    |   47 -
 .../store/jdbc/DatabaseLockerConfigTest.java    |   55 -
 .../store/jdbc/JDBCCommitExceptionTest.java     |  176 --
 .../jdbc/JDBCIOExceptionHandlerMockeryTest.java |  110 --
 .../store/jdbc/JDBCIOExceptionHandlerTest.java  |  330 ----
 .../activemq/store/jdbc/JDBCLockTablePrefix.xml |   58 -
 .../store/jdbc/JDBCLockTablePrefixTest.java     |   43 -
 .../store/jdbc/JDBCMessagePriorityTest.java     |  451 ------
 .../store/jdbc/JDBCNegativeQueueTest.java       |   93 --
 .../store/jdbc/JDBCNetworkBrokerDetachTest.java |   37 -
 .../store/jdbc/JDBCPersistenceAdapterTest.java  |   67 -
 .../store/jdbc/JDBCStoreAutoCommitTest.java     |  515 ------
 .../store/jdbc/JDBCStoreBrokerTest.java         |   60 -
 .../activemq/store/jdbc/JDBCStoreOrderTest.java |   62 -
 .../store/jdbc/JDBCTablePrefixAssignedTest.java |  133 --
 .../activemq/store/jdbc/JDBCTestMemory.java     |  157 --
 .../store/jdbc/JDBCXACommitExceptionTest.java   |  161 --
 .../store/jdbc/LeaseDatabaseLockerTest.java     |  273 ----
 .../activemq/store/kahadb/CustomLockerTest.java |   32 -
 .../store/kahadb/KahaDBFastEnqueueTest.java     |  249 ---
 .../store/kahadb/KahaDBIndexLocationTest.java   |  166 --
 .../store/kahadb/KahaDBMessagePriorityTest.java |   41 -
 .../kahadb/KahaDBPersistenceAdapterTest.java    |   39 -
 .../store/kahadb/KahaDBStoreBrokerTest.java     |   66 -
 .../store/kahadb/KahaDBStoreOrderTest.java      |   34 -
 .../kahadb/KahaDBStoreRecoveryBrokerTest.java   |  212 ---
 .../kahadb/KahaDBStoreRecoveryExpiryTest.java   |  113 --
 .../activemq/store/kahadb/KahaDBStoreTest.java  |  113 --
 .../activemq/store/kahadb/KahaDBTest.java       |  241 ---
 .../store/kahadb/KahaDBVersion1/db-1.log        |    0
 .../store/kahadb/KahaDBVersion1/db.data         |    0
 .../store/kahadb/KahaDBVersion1/db.redo         |    0
 .../store/kahadb/KahaDBVersion2/db-1.log        |    0
 .../store/kahadb/KahaDBVersion2/db.data         |    0
 .../store/kahadb/KahaDBVersion2/db.redo         |    0
 .../store/kahadb/KahaDBVersion3/db-1.log        |    0
 .../store/kahadb/KahaDBVersion3/db.data         |    0
 .../store/kahadb/KahaDBVersion3/db.redo         |    0
 .../store/kahadb/KahaDBVersion4/db-1.log        |    0
 .../store/kahadb/KahaDBVersion4/db.data         |    0
 .../store/kahadb/KahaDBVersion4/db.redo         |    0
 .../store/kahadb/KahaDBVersionTest.java         |  182 ---
 .../activemq/store/kahadb/NoSpaceIOTest.java    |  126 --
 .../activemq/store/kahadb/PBMesssagesTest.java  |   56 -
 .../store/kahadb/TempKahaDBStoreBrokerTest.java |   57 -
 .../store/kahadb/perf/KahaBulkLoadingTest.java  |  150 --
 .../kahadb/perf/KahaStoreDurableTopicTest.java  |   43 -
 .../store/kahadb/perf/KahaStoreQueueTest.java   |   45 -
 .../kahadb/perf/TempKahaStoreQueueTest.java     |   45 -
 .../KahaDBFilePendingMessageCursorTest.java     |   96 --
 .../activemq/store/kahadb/plist/PListTest.java  |  669 --------
 .../org/apache/activemq/store/kahadb/shared.xml |   59 -
 .../store/leveldb/LevelDBNegativeQueueTest.java |   38 -
 .../store/leveldb/LevelDBStoreBrokerTest.java   |   68 -
 .../activemq/store/schedulerDB/legacy/db-1.log  |    0
 .../store/schedulerDB/legacy/scheduleDB.data    |    0
 .../store/schedulerDB/legacy/scheduleDB.redo    |    0
 .../activemq/streams/JMSInputStreamTest.java    |  286 ----
 .../activemq/test/JmsResourceProvider.java      |  258 +++
 .../org/apache/activemq/test/TestSupport.java   |  256 +++
 .../activemq/transport/QueueClusterTest.java    |    5 +-
 .../transport/SoWriteTimeoutClientTest.java     |   77 +-
 .../activemq/transport/TopicClusterTest.java    |   71 +-
 .../transport/failover/AMQ1925Test.java         |  129 +-
 .../transport/failover/BadConnectionTest.java   |   85 -
 .../transport/failover/ClusterUtil.java         |   25 +
 .../failover/ConnectionHangOnStartupTest.java   |   33 +-
 .../failover/FailoverBackupLeakTest.java        |   80 +-
 .../transport/failover/FailoverClusterTest.java |  171 +-
 .../failover/FailoverComplexClusterTest.java    |  379 +++--
 .../FailoverConsumerOutstandingCommitTest.java  |  227 +--
 .../FailoverConsumerUnconsumedTest.java         |  242 +--
 .../failover/FailoverDuplicateTest.java         |  151 +-
 .../failover/FailoverPrefetchZeroTest.java      |  119 +-
 .../failover/FailoverPriorityTest.java          |  270 ++--
 .../transport/failover/FailoverRandomTest.java  |   64 +-
 .../FailoverRedeliveryTransactionTest.java      |   28 +-
 .../transport/failover/FailoverTimeoutTest.java |   36 +-
 .../failover/FailoverTransactionTest.java       |  672 +++-----
 .../failover/FailoverTransportBackupsTest.java  |   52 +-
 .../failover/FailoverTransportBrokerTest.java   |  219 ++-
 .../FailoverTransportUriHandlingTest.java       |    1 -
 .../failover/FailoverUpdateURIsTest.java        |   91 +-
 .../transport/failover/FailoverUriTest.java     |    1 +
 .../failover/InitalReconnectDelayTest.java      |   70 +-
 .../transport/failover/ReconnectTest.java       |   60 +-
 .../transport/failover/SlowConnectionTest.java  |    9 +-
 .../failover/TwoBrokerFailoverClusterTest.java  |  160 +-
 .../activemq/transport/fanout/FanoutTest.java   |   30 +-
 .../fanout/FanoutTransportBrokerTest.java       |  218 ++-
 .../transport/tcp/InactivityMonitorTest.java    |   19 +-
 .../transport/tcp/TransportUriTest.java         |   10 -
 .../transport/vm/VMTransportBrokerNameTest.java |   50 -
 .../transport/vm/VMTransportBrokerTest.java     |   38 -
 .../vm/VMTransportEmbeddedBrokerTest.java       |  104 --
 .../transport/vm/VMTransportThreadSafeTest.java |  937 -----------
 .../transport/vm/VMTransportWaitForTest.java    |  139 --
 .../vm/VmTransportNetworkBrokerTest.java        |  151 --
 .../org/apache/activemq/util/LockFileTest.java  |   70 +
 .../org/apache/activemq/util/SocketProxy.java   |  396 +++++
 .../java/org/apache/activemq/util/Wait.java     |   50 +
 361 files changed, 5356 insertions(+), 50719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java
index 25ce8e9..2760e8a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java
@@ -192,7 +192,8 @@ public abstract class URISchema<T, P> {
    public static void setData(URI uri,
                               HashMap<String, Object> properties,
                               Set<String> allowableProperties,
-                              Map<String, String> query) {
+                              Map<String, String> query,
+                              Map<String, Object> extraProps) {
       if (allowableProperties.contains("host")) {
          properties.put("host", "" + uri.getHost());
       }
@@ -206,6 +207,9 @@ public abstract class URISchema<T, P> {
          if (allowableProperties.contains(entry.getKey())) {
             properties.put(entry.getKey(), entry.getValue());
          }
+         else {
+            extraProps.put(entry.getKey(), entry.getValue());
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index 6f91537..d02c275 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable {
 
    private Map<String, Object> params;
 
+   private Map<String, Object> extraProps;
+
    private static final byte TYPE_BOOLEAN = 0;
 
    private static final byte TYPE_INT = 1;
@@ -93,6 +95,19 @@ public class TransportConfiguration implements Serializable {
     * @param name      The name of this TransportConfiguration
     */
    public TransportConfiguration(final String className, final Map<String, Object> params, final String name) {
+      this(className, params, name, null);
+   }
+
+   /**
+    * Creates a TransportConfiguration with a specific name providing the class name of the {@link org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory}
+    * and any parameters needed.
+    *
+    * @param className   The class name of the ConnectorFactory
+    * @param params      The parameters needed by the ConnectorFactory
+    * @param name        The name of this TransportConfiguration
+    * @param extraProps  The extra properties that specific to protocols
+    */
+   public TransportConfiguration(final String className, final Map<String, Object> params, final String name, final Map<String, Object> extraProps) {
       factoryClassName = className;
 
       if (params == null || params.isEmpty()) {
@@ -103,6 +118,7 @@ public class TransportConfiguration implements Serializable {
       }
 
       this.name = name;
+      this.extraProps = extraProps;
    }
 
    public TransportConfiguration newTransportConfig(String newName) {
@@ -156,6 +172,13 @@ public class TransportConfiguration implements Serializable {
       return params;
    }
 
+   public Map<String, Object> getAllParams() {
+      Map<String, Object> allParams = new HashMap<>(params);
+      if (extraProps != null) {
+         allParams.putAll(extraProps);
+      }
+      return allParams;
+   }
 
    @Override
    public int hashCode() {
@@ -249,10 +272,52 @@ public class TransportConfiguration implements Serializable {
 
             first = false;
          }
+         if (extraProps != null) {
+            for (Map.Entry<String, Object> entry : extraProps.entrySet()) {
+               if (!first) {
+                  str.append("&");
+               }
+
+               String key = entry.getKey();
+               String val = entry.getValue() == null ? "null" : entry.getValue().toString();
+
+               str.append(replaceWildcardChars(key)).append('=').append(replaceWildcardChars(val));
+
+               first = false;
+            }
+         }
       }
       return str.toString();
    }
 
+   private void encodeMap(final ActiveMQBuffer buffer, final Map<String, Object> map) {
+      for (Map.Entry<String, Object> entry : map.entrySet()) {
+         buffer.writeString(entry.getKey());
+
+         Object val = entry.getValue();
+
+         if (val instanceof Boolean) {
+            buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN);
+            buffer.writeBoolean((Boolean) val);
+         }
+         else if (val instanceof Integer) {
+            buffer.writeByte(TransportConfiguration.TYPE_INT);
+            buffer.writeInt((Integer) val);
+         }
+         else if (val instanceof Long) {
+            buffer.writeByte(TransportConfiguration.TYPE_LONG);
+            buffer.writeLong((Long) val);
+         }
+         else if (val instanceof String) {
+            buffer.writeByte(TransportConfiguration.TYPE_STRING);
+            buffer.writeString((String) val);
+         }
+         else {
+            throw ActiveMQClientMessageBundle.BUNDLE.invalidEncodeType(val);
+         }
+      }
+   }
+
    /**
     * Encodes this TransportConfiguration into a buffer.
     * <p>
@@ -267,31 +332,10 @@ public class TransportConfiguration implements Serializable {
       buffer.writeInt(params == null ? 0 : params.size());
 
       if (params != null) {
-         for (Map.Entry<String, Object> entry : params.entrySet()) {
-            buffer.writeString(entry.getKey());
-
-            Object val = entry.getValue();
-
-            if (val instanceof Boolean) {
-               buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN);
-               buffer.writeBoolean((Boolean) val);
-            }
-            else if (val instanceof Integer) {
-               buffer.writeByte(TransportConfiguration.TYPE_INT);
-               buffer.writeInt((Integer) val);
-            }
-            else if (val instanceof Long) {
-               buffer.writeByte(TransportConfiguration.TYPE_LONG);
-               buffer.writeLong((Long) val);
-            }
-            else if (val instanceof String) {
-               buffer.writeByte(TransportConfiguration.TYPE_STRING);
-               buffer.writeString((String) val);
-            }
-            else {
-               throw ActiveMQClientMessageBundle.BUNDLE.invalidEncodeType(val);
-            }
-         }
+         encodeMap(buffer, params);
+      }
+      if (extraProps != null) {
+         encodeMap(buffer, extraProps);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
index 309e3e4..628a8ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
@@ -60,10 +60,13 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
                                                                          String factoryName) throws URISyntaxException {
       HashMap<String, Object> props = new HashMap<>();
 
-      setData(uri, props, allowableProperties, query);
+      Map<String, Object> extraProps = new HashMap<>();
+      setData(uri, props, allowableProperties, query, extraProps);
       List<TransportConfiguration> transportConfigurations = new ArrayList<>();
 
-      transportConfigurations.add(new TransportConfiguration(factoryName, props, name));
+      TransportConfiguration config = new TransportConfiguration(factoryName, props, name, extraProps);
+
+      transportConfigurations.add(config);
       String connectors = uri.getFragment();
 
       if (connectors != null && !connectors.trim().isEmpty()) {
@@ -71,9 +74,10 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
          for (String s : split) {
             URI extraUri = new URI(s);
             HashMap<String, Object> newProps = new HashMap<>();
-            setData(extraUri, newProps, allowableProperties, query);
-            setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null));
-            transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString()));
+            extraProps = new HashMap<>();
+            setData(extraUri, newProps, allowableProperties, query, extraProps);
+            setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null), extraProps);
+            transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), extraProps));
          }
       }
       return transportConfigurations;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java
index 7e235a6..bee15f0 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.server.embedded;
 
 import javax.naming.Context;
 
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
 import org.apache.activemq.artemis.core.registry.MapBindingRegistry;
@@ -80,13 +81,19 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
       return this;
    }
 
-   /**
-    * Lookup in the registry for registered object, i.e. a ConnectionFactory.
-    * <p>
-    * This is a convenience method.
-    *
-    * @param name
-    */
+
+   public EmbeddedJMS setConfiguration(Configuration configuration) {
+      super.setConfiguration(configuration);
+      return this;
+   }
+
+      /**
+       * Lookup in the registry for registered object, i.e. a ConnectionFactory.
+       * <p>
+       * This is a convenience method.
+       *
+       * @param name
+       */
    public Object lookup(String name) {
       return serverManager.getRegistry().lookup(name);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index 35f584b..9872d0f 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -1047,28 +1047,32 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
    private boolean internalCreateQueue(final String queueName,
                                        final String selectorString,
                                        final boolean durable) throws Exception {
-      if (queues.get(queueName) != null) {
-         return false;
-      }
-      else {
-         ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
+      // TODO: there was an openwire test failng because of this
+      //       is this really needed for FailoverClusterTest ?
+      synchronized (queues) {
+         if (queues.get(queueName) != null) {
+            return false;
+         }
+         else {
+            ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
 
-         // Convert from JMS selector to core filter
-         String coreFilterString = null;
+            // Convert from JMS selector to core filter
+            String coreFilterString = null;
 
-         if (selectorString != null) {
-            coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
-         }
+            if (selectorString != null) {
+               coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
+            }
 
-         Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
+            Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
 
-         queues.put(queueName, activeMQQueue);
+            queues.put(queueName, activeMQQueue);
 
-         this.recoverregistryBindings(queueName, PersistedType.Queue);
+            this.recoverregistryBindings(queueName, PersistedType.Queue);
 
-         jmsManagementService.registerQueue(activeMQQueue, queue);
+            jmsManagementService.registerQueue(activeMQQueue, queue);
 
-         return true;
+            return true;
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index c2c535e..61e93cb 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
 import javax.jms.ResourceAllocationException;
@@ -54,8 +55,8 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -79,7 +80,6 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -100,6 +100,7 @@ import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents an activemq connection.
+ * ToDo: extends AbstractRemotingConnection
  */
 public class OpenWireConnection implements RemotingConnection, CommandVisitor, SecurityAuth {
 
@@ -121,7 +122,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private final Acceptor acceptorUsed;
 
-   private OpenWireFormat wireFormat;
+   private final OpenWireFormat wireFormat;
 
    private AMQConnectionContext context;
 
@@ -150,6 +151,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private volatile AMQSession advisorySession;
 
+   private String defaultSocketURIString;
+
+   private boolean rebalance;
+   private boolean updateClusterClients;
+   private boolean updateClusterClientsOnRemove;
+
    public OpenWireConnection(Acceptor acceptorUsed,
                              Connection connection,
                              OpenWireProtocolManager openWireProtocolManager,
@@ -159,6 +166,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       this.acceptorUsed = acceptorUsed;
       this.wireFormat = wf;
       this.creationTime = System.currentTimeMillis();
+      this.defaultSocketURIString = connection.getLocalAddress();
+
+      //Clebert: These are parameters specific to openwire cluster with defaults as specified at
+      //http://activemq.apache.org/failover-transport-reference.html
+      rebalance = ConfigurationHelper.getBooleanProperty("rebalance-cluster-clients", true, acceptorUsed.getConfiguration());
+      updateClusterClients = ConfigurationHelper.getBooleanProperty("update-cluster-clients", true, acceptorUsed.getConfiguration());
+      updateClusterClientsOnRemove = ConfigurationHelper.getBooleanProperty("update-cluster-clients-on-remove", true, acceptorUsed.getConfiguration());
    }
 
    @Override
@@ -186,6 +200,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return info.getPassword();
    }
 
+   public boolean isRebalance() {
+      return rebalance;
+   }
 
    private ConnectionInfo getConnectionInfo() {
       if (state == null) {
@@ -209,6 +226,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
          Command command = (Command) wireFormat.unmarshal(buffer);
 
+         //logger.log("got command: " + command);
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
          // the connection handles pings, negotiations directly.
@@ -224,10 +242,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             // amq here starts a read/write monitor thread (detect ttl?)
             negotiate((WireFormatInfo) command);
          }
-         else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class ||
-                  command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
-                  command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class ||
-                  command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
+         else {
             Response response = null;
 
             if (pendingStop) {
@@ -235,6 +250,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             }
             else {
                try {
+                  setLastCommand(command);
                   response = command.visit(this);
                }
                catch (Exception e) {
@@ -242,6 +258,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
                      response = new ExceptionResponse(e);
                   }
                }
+               finally {
+                  setLastCommand(null);
+               }
 
                if (response instanceof ExceptionResponse) {
                   if (!responseRequired) {
@@ -255,6 +274,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             if (responseRequired) {
                if (response == null) {
                   response = new Response();
+                  response.setCorrelationId(command.getCommandId());
                }
             }
 
@@ -273,11 +293,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             }
 
          }
-         else {
-            // note!!! wait for negotiation (e.g. use a countdown latch)
-            // before handling any other commands
-            this.protocolManager.handleCommand(this, command);
-         }
       }
       catch (IOException e) {
          ActiveMQServerLogger.LOGGER.error("error decoding", e);
@@ -287,6 +302,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
+   private void setLastCommand(Command command) {
+      if (context != null) {
+         context.setLastCommand(command);
+      }
+   }
+
    private void negotiate(WireFormatInfo command) throws IOException {
       this.wireFormat.renegotiateWireFormat(command);
       //throw back a brokerInfo here
@@ -390,36 +411,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public void fail(ActiveMQException me) {
-      if (me != null) {
-         ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
-      }
-
-      // Then call the listeners
-      callFailureListeners(me);
-
-      callClosingListeners();
-
-      destroyed = true;
-
-      transportConnection.close();
+      fail(me, null);
    }
 
    @Override
    public void destroy() {
-      destroyed = true;
-
-      transportConnection.close();
-
-      try {
-         deleteTempQueues();
-      }
-      catch (Exception e) {
-         //log warning
-      }
-
-      synchronized (sendLock) {
-         callClosingListeners();
-      }
+      fail(null, null);
    }
 
    private void deleteTempQueues() throws Exception {
@@ -452,7 +449,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public void disconnect(boolean criticalError) {
-      fail(null);
+      this.disconnect(null, null, criticalError);
    }
 
    @Override
@@ -529,39 +526,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public Response processAddConnection(ConnectionInfo info) throws Exception {
-      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
-      // Older clients should have been defaulting this field to true.. but
-      // they were not.
-      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
-         info.setClientMaster(true);
-      }
-
-      state = new ConnectionState(info);
-
-      context = new AMQConnectionContext();
-
-      state.reset(info);
-
-      // Setup the context.
-      String clientId = info.getClientId();
-      context.setBroker(protocolManager);
-      context.setClientId(clientId);
-      context.setClientMaster(info.isClientMaster());
-      context.setConnection(this);
-      context.setConnectionId(info.getConnectionId());
-      // for now we pass the manager as the connector and see what happens
-      // it should be related to activemq's Acceptor
-      context.setFaultTolerant(info.isFaultTolerant());
-      context.setUserName(info.getUserName());
-      context.setWireFormatInfo(wireFormatInfo);
-      context.setReconnect(info.isFailoverReconnect());
-      context.setConnectionState(state);
-      if (info.getClientIp() == null) {
-         info.setClientIp(getRemoteAddress());
-      }
-
+      //let protoclmanager handle connection add/remove
       try {
-         protocolManager.addConnection(context, info);
+         protocolManager.addConnection(this, info);
       }
       catch (Exception e) {
          if (e instanceof SecurityException) {
@@ -572,9 +539,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          Response resp = new ExceptionResponse(e);
          return resp;
       }
-      if (info.isManageable()) {
+      if (info.isManageable() && this.isUpdateClusterClients()) {
          // send ConnectionCommand
-         ConnectionControl command = new ConnectionControl();
+         ConnectionControl command = protocolManager.newConnectionControl(rebalance);
          command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
          if (info.isFailoverReconnect()) {
             command.setRebalanceConnection(false);
@@ -582,6 +549,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          dispatchAsync(command);
       }
       return null;
+
    }
 
    public void dispatchAsync(Command message) {
@@ -768,6 +736,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          protocolManager.addConsumer(this, info);
       }
       catch (Exception e) {
+         e.printStackTrace();
          if (e instanceof ActiveMQSecurityException) {
             resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
          }
@@ -917,8 +886,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
    }
 
    @Override
-   public Response processConnectionControl(ConnectionControl arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
+      //activemq5 keeps a var to remember only the faultTolerant flag
+      //this can be sent over a reconnected transport as the first command
+      //before restoring the connection.
+      return null;
    }
 
    @Override
@@ -927,8 +899,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
    }
 
    @Override
-   public Response processConsumerControl(ConsumerControl arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
+      //amq5 clients send this command to restore prefetchSize
+      //after successful reconnect
+      try {
+         protocolManager.updateConsumer(this, consumerControl);
+      }
+      catch (Exception e) {
+         //log error
+      }
+      return null;
    }
 
    @Override
@@ -1089,21 +1069,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
-      // Don't allow things to be added to the connection state while we
-      // are shutting down.
-      state.shutdown();
-      // Cascade the connection stop to the sessions.
-      for (SessionId sessionId : state.getSessionIds()) {
-         try {
-            processRemoveSession(sessionId, lastDeliveredSequenceId);
-         }
-         catch (Throwable e) {
-            // LOG
-         }
-      }
-
+      //we let protocol manager to handle connection add/remove
       try {
-         protocolManager.removeConnection(context, state.getInfo(), null);
+         protocolManager.removeConnection(this, state.getInfo(), null);
       }
       catch (Throwable e) {
          // log
@@ -1162,15 +1130,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       // Don't let new consumers or producers get added while we are closing
       // this down.
       session.shutdown();
-      // Cascade the connection stop to the consumers and producers.
-      for (ConsumerId consumerId : session.getConsumerIds()) {
-         try {
-            processRemoveConsumer(consumerId, lastDeliveredSequenceId);
-         }
-         catch (Throwable e) {
-            // LOG.warn("Failed to remove consumer: {}", consumerId, e);
-         }
-      }
+      // Cascade the connection stop producers.
+      // we don't stop consumer because in core
+      // closing the session will do the job
       for (ProducerId producerId : session.getProducerIds()) {
          try {
             processRemoveProducer(producerId);
@@ -1200,6 +1162,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public Response processShutdown(ShutdownInfo info) throws Exception {
+      this.shutdown(false);
       return null;
    }
 
@@ -1234,14 +1197,63 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       tempQueues.add(queue);
    }
 
+   private void shutdown(boolean fail) {
+      if (fail) {
+         transportConnection.forceClose();
+      }
+      else {
+         transportConnection.close();
+      }
+   }
+
+   private void disconnect(ActiveMQException me, String reason, boolean fail) {
+
+      if (context == null || destroyed) {
+         return;
+      }
+      // Don't allow things to be added to the connection state while we
+      // are shutting down.
+      // is it necessary? even, do we need state at all?
+      state.shutdown();
+
+      // Then call the listeners
+      // this should closes underlying sessions
+      callFailureListeners(me);
+
+      // this should clean up temp dests
+      synchronized (sendLock) {
+         callClosingListeners();
+      }
+
+      destroyed = true;
+
+      //before closing transport, send the last response if any
+      Command command = context.getLastCommand();
+      if (command != null && command.isResponseRequired()) {
+         Response lastResponse = new Response();
+         lastResponse.setCorrelationId(command.getCommandId());
+         dispatchSync(lastResponse);
+         context.setDontSendReponse(true);
+      }
+   }
+
    @Override
    public void disconnect(String reason, boolean fail) {
-      destroy();
+      this.disconnect(null, reason, fail);
    }
 
    @Override
-   public void fail(ActiveMQException e, String message) {
-      destroy();
+   public void fail(ActiveMQException me, String message) {
+      if (me != null) {
+         ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+      }
+      try {
+         protocolManager.removeConnection(this, this.getConnectionInfo(), me);
+      }
+      catch (InvalidClientIDException e) {
+         ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e);
+      }
+      shutdown(true);
    }
 
    public void setAdvisorySession(AMQSession amqSession) {
@@ -1252,8 +1264,82 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return this.advisorySession;
    }
 
-   public AMQConnectionContext getConext() {
+   public AMQConnectionContext getContext() {
       return this.context;
    }
 
+   public String getDefaultSocketURIString() {
+      return defaultSocketURIString;
+   }
+
+   public void updateClient(ConnectionControl control) {
+      //      if (!destroyed && context.isFaultTolerant()) {
+      if (updateClusterClients) {
+         dispatchAsync(control);
+      }
+      //      }
+   }
+
+   public boolean isUpdateClusterClients() {
+      return updateClusterClients;
+   }
+
+   public AMQConnectionContext initContext(ConnectionInfo info) {
+      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
+      // Older clients should have been defaulting this field to true.. but
+      // they were not.
+      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
+         info.setClientMaster(true);
+      }
+
+      state = new ConnectionState(info);
+
+      context = new AMQConnectionContext();
+
+      state.reset(info);
+
+      // Setup the context.
+      String clientId = info.getClientId();
+      context.setBroker(protocolManager);
+      context.setClientId(clientId);
+      context.setClientMaster(info.isClientMaster());
+      context.setConnection(this);
+      context.setConnectionId(info.getConnectionId());
+      // for now we pass the manager as the connector and see what happens
+      // it should be related to activemq's Acceptor
+      context.setFaultTolerant(info.isFaultTolerant());
+      context.setUserName(info.getUserName());
+      context.setWireFormatInfo(wireFormatInfo);
+      context.setReconnect(info.isFailoverReconnect());
+      context.setConnectionState(state);
+      if (info.getClientIp() == null) {
+         info.setClientIp(getRemoteAddress());
+      }
+
+      return context;
+   }
+
+   //raise the refCount of context
+   public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) {
+      this.context = existingContext;
+      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
+      // Older clients should have been defaulting this field to true.. but
+      // they were not.
+      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
+         info.setClientMaster(true);
+      }
+      if (info.getClientIp() == null) {
+         info.setClientIp(getRemoteAddress());
+      }
+
+      state = new ConnectionState(info);
+      state.reset(info);
+
+      context.setConnection(this);
+      context.setConnectionState(state);
+      context.setClientMaster(info.isClientMaster());
+      context.setFaultTolerant(info.isFaultTolerant());
+      context.setReconnect(true);
+      context.incRefCount();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index f916c8f..525844c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -37,6 +38,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -52,6 +55,8 @@ import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -70,15 +75,15 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -97,7 +102,7 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.LongSequenceGenerator;
 
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener, ClusterTopologyListener {
 
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -122,17 +127,25 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
    protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
-
-   private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
+   // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id)
+   private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
 
    private String brokerName;
 
+   // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
    private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
 
+   // Clebert: Artemis already has a Resource Manager. Need to remove this..
+   //          The TransactionID extends XATransactionID, so all we need is to convert the XID here
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
 
+   // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
    private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
 
+   private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
+
+   private final LinkedList<TopologyMember> members = new LinkedList<>();
+
    private final ScheduledExecutorService scheduledPool;
 
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
@@ -148,6 +161,37 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
          service.addNotificationListener(this);
       }
 
+      final ClusterManager clusterManager = this.server.getClusterManager();
+      ClusterConnection cc = clusterManager.getDefaultConnection(null);
+      if (cc != null) {
+         cc.addClusterTopologyListener(this);
+      }
+   }
+
+   @Override
+   public void nodeUP(TopologyMember member, boolean last) {
+      if (topologyMap.put(member.getNodeId(), member) == null) {
+         updateClientClusterInfo();
+      }
+   }
+
+   public void nodeDown(long eventUID, String nodeID) {
+      if (topologyMap.remove(nodeID) != null) {
+         updateClientClusterInfo();
+      }
+   }
+
+   private void updateClientClusterInfo() {
+
+      synchronized (members) {
+         members.clear();
+         members.addAll(topologyMap.values());
+      }
+
+      for (OpenWireConnection c : this.connections) {
+         ConnectionControl control = newConnectionControl(c.isRebalance());
+         c.updateClient(control);
+      }
    }
 
    @Override
@@ -172,6 +216,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
       owConn.init();
 
+      // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
       return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
    }
 
@@ -229,28 +274,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    }
 
-   public void handleCommand(OpenWireConnection openWireConnection, Object command) throws Exception {
-      Command amqCmd = (Command) command;
-      byte type = amqCmd.getDataStructureType();
-      switch (type) {
-         case CommandTypes.CONNECTION_INFO:
-            break;
-         case CommandTypes.CONNECTION_CONTROL:
-            /** The ConnectionControl packet sent from client informs the broker that is capable of supporting dynamic
-             * failover and load balancing.  These features are not yet implemented for Artemis OpenWire.  Instead we
-             * simply drop the packet.  See: ACTIVEMQ6-108 */
-            break;
-         case CommandTypes.MESSAGE_PULL:
-            MessagePull messagePull = (MessagePull) amqCmd;
-            openWireConnection.processMessagePull(messagePull);
-            break;
-         case CommandTypes.CONSUMER_CONTROL:
-            break;
-         default:
-            throw new IllegalStateException("Cannot handle command: " + command);
-      }
-   }
-
    public void sendReply(final OpenWireConnection connection, final Command command) {
       server.getStorageManager().afterCompleteOperations(new IOCallback() {
          @Override
@@ -287,50 +310,50 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
    }
 
-   public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception {
+   public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
       String username = info.getUserName();
       String password = info.getPassword();
 
       if (!this.validateUser(username, password)) {
          throw new SecurityException("User name [" + username + "] or password is invalid.");
       }
+
       String clientId = info.getClientId();
       if (clientId == null) {
          throw new InvalidClientIDException("No clientID specified for connection request");
       }
+
       synchronized (clientIdSet) {
-         AMQConnectionContext oldContext = clientIdSet.get(clientId);
-         if (oldContext != null) {
-            if (context.isAllowLinkStealing()) {
-               clientIdSet.remove(clientId);
-               if (oldContext.getConnection() != null) {
-                  OpenWireConnection connection = oldContext.getConnection();
-                  connection.disconnect(true);
-               }
-               else {
-                  // log error
-               }
+         AMQConnectionContext context;
+         context = clientIdSet.get(clientId);
+         if (context != null) {
+            if (info.isFailoverReconnect()) {
+               OpenWireConnection oldConnection = context.getConnection();
+               oldConnection.disconnect(true);
+               connections.remove(oldConnection);
+               connection.reconnect(context, info);
             }
             else {
-               throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress());
+               throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
             }
          }
          else {
+            //new connection
+            context = connection.initContext(info);
             clientIdSet.put(clientId, context);
          }
-      }
 
-      connections.add(context.getConnection());
+         connections.add(connection);
 
-      ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
-      // do not distribute passwords in advisory messages. usernames okay
-      ConnectionInfo copy = info.copy();
-      copy.setPassword("");
-      fireAdvisory(context, topic, copy);
-      connectionInfos.put(copy.getConnectionId(), copy);
+         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
+         // do not distribute passwords in advisory messages. usernames okay
+         ConnectionInfo copy = info.copy();
+         copy.setPassword("");
+         fireAdvisory(context, topic, copy);
 
-      // init the conn
-      addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+         // init the conn
+         addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+      }
    }
 
    private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
@@ -338,6 +361,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    }
 
    public BrokerId getBrokerId() {
+      // TODO: Use the Storage ID here...
       if (brokerId == null) {
          brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
       }
@@ -398,6 +422,40 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       return brokerName;
    }
 
+   protected ConnectionControl newConnectionControl(boolean rebalance) {
+      ConnectionControl control = new ConnectionControl();
+
+      String uri = generateMembersURI(rebalance);
+      control.setConnectedBrokers(uri);
+
+      control.setRebalanceConnection(rebalance);
+      return control;
+   }
+
+   private String generateMembersURI(boolean flip) {
+      String uri;
+      StringBuffer connectedBrokers = new StringBuffer();
+      String separator = "";
+
+      synchronized (members) {
+         if (members.size() > 0) {
+            for (TopologyMember member : members) {
+               connectedBrokers.append(separator).append(member.toURI());
+               separator = ",";
+            }
+
+            // The flip exists to guarantee even distribution of URIs when sent to the client
+            // in case of failures you won't get all the connections failing to a single server.
+            if (flip && members.size() > 1) {
+               members.addLast(members.removeFirst());
+            }
+         }
+      }
+
+      uri = connectedBrokers.toString();
+      return uri;
+   }
+
    public boolean isFaultTolerantConfiguration() {
       return false;
    }
@@ -448,11 +506,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
             if (destination.isQueue()) {
                OpenWireUtil.validateDestination(destination, amqSession);
             }
-            DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+            DestinationInfo destInfo = new DestinationInfo(theConn.getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
             this.addDestination(theConn, destInfo);
          }
 
-
          amqSession.createProducer(info);
 
          try {
@@ -466,6 +523,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    }
 
+   public void updateConsumer(OpenWireConnection theConn, ConsumerControl consumerControl) {
+      SessionId sessionId = consumerControl.getConsumerId().getParentId();
+      AMQSession amqSession = sessions.get(sessionId);
+      amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+   }
+
    public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
       // Todo: add a destination interceptors holder here (amq supports this)
       SessionId sessionId = info.getConsumerId().getParentId();
@@ -519,13 +582,23 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       return amqSession;
    }
 
-   public void removeConnection(AMQConnectionContext context, ConnectionInfo info, Throwable error) {
-      // todo roll back tx
-      this.connections.remove(context.getConnection());
-      this.connectionInfos.remove(info.getConnectionId());
-      String clientId = info.getClientId();
-      if (clientId != null) {
-         this.clientIdSet.remove(clientId);
+   public void removeConnection(OpenWireConnection connection,
+                                ConnectionInfo info,
+                                Throwable error) throws InvalidClientIDException {
+      synchronized (clientIdSet) {
+         String clientId = info.getClientId();
+         if (clientId != null) {
+            AMQConnectionContext context = this.clientIdSet.get(clientId);
+            if (context != null && context.decRefCount() == 0) {
+               //connection is still there and need to close
+               this.clientIdSet.remove(clientId);
+               connection.disconnect(error != null);
+               this.connections.remove(connection);//what's that for?
+            }
+         }
+         else {
+            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+         }
       }
    }
 
@@ -568,7 +641,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
 
       if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getConext();
+         AMQConnectionContext context = connection.getContext();
          DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
 
          ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
@@ -598,7 +671,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
 
       if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getConext();
+         AMQConnectionContext context = connection.getContext();
          DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
 
          ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
@@ -720,7 +793,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
          ActiveMQMessage advisoryMessage = new ActiveMQMessage();
          advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
 
-         fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
+         fireAdvisory(cc.getContext(), topic, advisoryMessage, consumer.getId());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
index a79911c..8071d04 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
@@ -17,9 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -47,6 +49,8 @@ public class AMQConnectionContext {
    private boolean clientMaster = true;
    private ConnectionState connectionState;
    private XATransactionId xid;
+   private AtomicInteger refCount = new AtomicInteger(1);
+   private Command lastCommand;
 
    public AMQConnectionContext() {
       this.messageEvaluationContext = new MessageEvaluationContext();
@@ -248,4 +252,19 @@ public class AMQConnectionContext {
       return false;
    }
 
+   public void incRefCount() {
+      refCount.incrementAndGet();
+   }
+
+   public int decRefCount() {
+      return refCount.decrementAndGet();
+   }
+
+   public void setLastCommand(Command lastCommand) {
+      this.lastCommand = lastCommand;
+   }
+
+   public Command getLastCommand() {
+      return this.lastCommand;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 7da1f3e..b0f007a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 public class AMQConsumer implements BrowserListener {
-
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination actualDest;
    private ConsumerInfo info;
@@ -52,7 +51,7 @@ public class AMQConsumer implements BrowserListener {
    private long nativeId = -1;
    private SimpleString subQueueName = null;
 
-   private final int prefetchSize;
+   private int prefetchSize;
    private AtomicInteger windowAvailable;
    private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
    private long messagePullSequence = 0;
@@ -220,7 +219,7 @@ public class AMQConsumer implements BrowserListener {
       else if (ack.isRedeliveredAck()) {
          //client tells that this message is for redlivery.
          //do nothing until poisoned.
-         n = 1;
+         n = ack.getMessageCount();
       }
       else if (ack.isPoisonAck()) {
          //send to dlq
@@ -251,7 +250,7 @@ public class AMQConsumer implements BrowserListener {
       }
       else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
          //ToDo: implement with tests
-         n = 1;
+         n = ack.getMessageCount();
       }
       else {
          Iterator<MessageInfo> iter = deliveringRefs.iterator();
@@ -374,6 +373,15 @@ public class AMQConsumer implements BrowserListener {
       return actualDest;
    }
 
+   public void setPrefetchSize(int prefetchSize) {
+      this.prefetchSize = prefetchSize;
+      this.windowAvailable.set(prefetchSize);
+      this.info.setPrefetchSize(prefetchSize);
+      if (this.prefetchSize > 0) {
+         session.getCoreSession().promptDelivery(nativeId);
+      }
+   }
+
    private class MessagePullHandler {
 
       private long next = -1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index c414319..5403830 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -143,6 +143,8 @@ public class AMQServerSession extends ServerSessionImpl {
    }
 
    //amq specific behavior
+
+   // TODO: move this to AMQSession
    public void amqRollback(Set<Long> acked) throws Exception {
       if (tx == null) {
          // Might be null if XA
@@ -218,7 +220,9 @@ public class AMQServerSession extends ServerSessionImpl {
                                         final boolean supportLargeMessage,
                                         final Integer credits) throws Exception {
       if (this.internal) {
-         //internal sessions doesn't check security
+         // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
+
+         //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
 
          Binding binding = postOffice.getBinding(queueName);
 
@@ -309,6 +313,8 @@ public class AMQServerSession extends ServerSessionImpl {
       return queue;
    }
 
+
+   // Clebert TODO: Get rid of these mthods
    @Override
    protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
       if (!this.internal) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index a388f50..701c9ce 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
@@ -62,7 +63,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
-
    private AMQServerSession coreSession;
    private ConnectionInfo connInfo;
    private SessionInfo sessInfo;
@@ -137,6 +137,7 @@ public class AMQSession implements SessionCallback {
             getCoreServer().getJMSQueueCreator().create(queueName);
          }
          AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
+
          consumer.init();
          consumerMap.put(d, consumer);
          consumers.put(consumer.getNativeId(), consumer);
@@ -193,8 +194,15 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public boolean hasCredits(ServerConsumer consumerID) {
-      AMQConsumer amqConsumer = consumers.get(consumerID.getID());
-      return amqConsumer.hasCredits();
+
+      AMQConsumer amqConsumer;
+
+      amqConsumer = consumers.get(consumerID.getID());
+
+      if (amqConsumer != null) {
+         return amqConsumer.hasCredits();
+      }
+      return false;
    }
 
    @Override
@@ -445,6 +453,16 @@ public class AMQSession implements SessionCallback {
       return consumers.get(coreConsumerId);
    }
 
+   public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
+      Iterator<AMQConsumer> iterator = consumers.values().iterator();
+      while (iterator.hasNext()) {
+         AMQConsumer consumer = iterator.next();
+         if (consumer.getId().equals(consumerId)) {
+            consumer.setPrefetchSize(prefetch);
+         }
+      }
+   }
+
    private class SendRetryTask implements Runnable {
 
       private ServerMessage coreMsg;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index d52f53f..4a24b57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -947,4 +947,8 @@ public interface Configuration {
    StoreConfiguration getStoreConfiguration();
 
    Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
+
+   /** It will return all the connectors in a toString manner for debug purposes. */
+   String debugConnectors();
+
 }


Mime
View raw message