Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C7389200B92 for ; Wed, 28 Sep 2016 13:51:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C620B160AD4; Wed, 28 Sep 2016 11:51:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68E37160AB4 for ; Wed, 28 Sep 2016 13:51:55 +0200 (CEST) Received: (qmail 83824 invoked by uid 500); 28 Sep 2016 11:51:54 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 83812 invoked by uid 99); 28 Sep 2016 11:51:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2016 11:51:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F4D4DFC7E; Wed, 28 Sep 2016 11:51:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Date: Wed, 28 Sep 2016 11:51:54 -0000 Message-Id: <90d2d6f06cbe4cd18d65631593e199ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/8] ignite git commit: Code style fixes. archived-at: Wed, 28 Sep 2016 11:51:56 -0000 Repository: ignite Updated Branches: refs/heads/master 4ffb5860b -> ed4739235 Code style fixes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec9ddcd3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec9ddcd3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec9ddcd3 Branch: refs/heads/master Commit: ec9ddcd3d99d19403bf19e1172ede2afdab6c86f Parents: a53c399 Author: sboikov Authored: Wed Sep 28 12:05:28 2016 +0300 Committer: sboikov Committed: Wed Sep 28 12:05:28 2016 +0300 ---------------------------------------------------------------------- .../stream/jms11/IgniteJmsStreamerTest.java | 206 ++++++++++++------- .../jms11/IgniteJmsStreamerTestSuite.java | 2 +- 2 files changed, 134 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ec9ddcd3/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java index 238d939..290185e 100644 --- a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java +++ b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java @@ -59,26 +59,39 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; * @author Raul Kripalani */ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { - + /** */ private static final int CACHE_ENTRY_COUNT = 100; + + /** */ private static final String QUEUE_NAME = "ignite.test.queue"; + + /** */ private static final String TOPIC_NAME = "ignite.test.topic"; + + /** */ private static final Map TEST_DATA = new HashMap<>(); + /** */ + private BrokerService broker; + + /** */ + private ConnectionFactory connFactory; + static { for (int i = 1; i <= CACHE_ENTRY_COUNT; i++) TEST_DATA.put(Integer.toString(i), "v" + i); } - private BrokerService broker; - private ConnectionFactory connectionFactory; - /** Constructor. */ public IgniteJmsStreamerTest() { super(true); } - @Before @SuppressWarnings("unchecked") + /** + * @throws Exception If failed. + */ + @Before + @SuppressWarnings("unchecked") public void beforeTest() throws Exception { grid().getOrCreateCache(defaultCacheConfiguration()); @@ -88,21 +101,24 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { broker.setPersistenceAdapter(null); broker.setPersistenceFactory(null); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); + PolicyMap plcMap = new PolicyMap(); + PolicyEntry plc = new PolicyEntry(); - policy.setQueuePrefetch(1); + plc.setQueuePrefetch(1); - broker.setDestinationPolicy(policyMap); - broker.getDestinationPolicy().setDefaultEntry(policy); + broker.setDestinationPolicy(plcMap); + broker.getDestinationPolicy().setDefaultEntry(plc); broker.setSchedulerSupport(false); broker.start(true); - connectionFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); - + connFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); } + /** + * + * @throws Exception Iff ailed. + */ @After public void afterTest() throws Exception { grid().cache(null).clear(); @@ -111,11 +127,14 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { broker.deleteAllMessages(); } + /** + * @throws Exception If failed. + */ public void testQueueFromName() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce messages into the queue - produceObjectMessages(destination, false); + produceObjectMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); @@ -137,8 +156,11 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testTopicFromName() throws JMSException, InterruptedException { - Destination destination = new ActiveMQTopic(TOPIC_NAME); + Destination dest = new ActiveMQTopic(TOPIC_NAME); // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not // a durable subscriber (for which a dedicated test exists) @@ -154,7 +176,7 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { jmsStreamer.start(); // produce messages - produceObjectMessages(destination, false); + produceObjectMessages(dest, false); // all cache PUT events received in 10 seconds latch.await(10, TimeUnit.SECONDS); @@ -166,15 +188,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testQueueFromExplicitDestination() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce messages into the queue - produceObjectMessages(destination, false); + produceObjectMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); @@ -192,15 +217,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testTopicFromExplicitDestination() throws JMSException, InterruptedException { - Destination destination = new ActiveMQTopic(TOPIC_NAME); + Destination dest = new ActiveMQTopic(TOPIC_NAME); // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not // a durable subscriber (for which a dedicated test exists) try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); @@ -208,7 +236,7 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { jmsStreamer.start(); // produce messages - produceObjectMessages(destination, false); + produceObjectMessages(dest, false); // all cache PUT events received in 10 seconds latch.await(10, TimeUnit.SECONDS); @@ -220,15 +248,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testInsertMultipleCacheEntriesFromOneMessage() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce A SINGLE MESSAGE, containing all data, into the queue - produceStringMessages(destination, true); + produceStringMessages(dest, true); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); @@ -245,12 +276,15 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testDurableSubscriberStartStopStart() throws Exception { - Destination destination = new ActiveMQTopic(TOPIC_NAME); + Destination dest = new ActiveMQTopic(TOPIC_NAME); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); jmsStreamer.setDurableSubscription(true); jmsStreamer.setClientId(Long.toString(System.currentTimeMillis())); jmsStreamer.setDurableSubscriptionName("ignite-test-durable"); @@ -265,7 +299,7 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { assertEquals(0, broker.getCurrentConnections()); // we send messages while we're still away - produceStringMessages(destination, false); + produceStringMessages(dest, false); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); @@ -282,15 +316,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testQueueMessagesConsumedInBatchesCompletionSizeBased() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce multiple messages into the queue - produceStringMessages(destination, false); + produceStringMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); jmsStreamer.setBatched(true); jmsStreamer.setBatchClosureSize(99); @@ -309,7 +346,7 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { // we expect all entries to be loaded, but still one (uncommitted) message should remain in the queue // as observed by the broker - DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); assertEquals(1, qStats.getMessages().getCount()); assertEquals(1, qStats.getInflight().getCount()); @@ -318,15 +355,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testQueueMessagesConsumedInBatchesCompletionTimeBased() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce multiple messages into the queue - produceStringMessages(destination, false); + produceStringMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); jmsStreamer.setBatched(true); jmsStreamer.setBatchClosureMillis(2000); // disable size-based session commits @@ -334,7 +374,7 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); - DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); jmsStreamer.start(); @@ -365,17 +405,20 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testGenerateNoEntries() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce multiple messages into the queue - produceStringMessages(destination, false); + produceStringMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); // override the transformer with one that generates no cache entries jmsStreamer.setTransformer(TestTransformers.generateNoEntries()); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(1); @@ -390,16 +433,19 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testTransactedSessionNoBatching() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce multiple messages into the queue - produceStringMessages(destination, false); + produceStringMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); jmsStreamer.setTransacted(true); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); @@ -416,15 +462,18 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * @throws Exception If failed. + */ public void testQueueMultipleThreads() throws Exception { - Destination destination = new ActiveMQQueue(QUEUE_NAME); + Destination dest = new ActiveMQQueue(QUEUE_NAME); // produce messages into the queue - produceObjectMessages(destination, false); + produceObjectMessages(dest, false); try (IgniteDataStreamer dataStreamer = grid().dataStreamer(null)) { JmsStreamer jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); - jmsStreamer.setDestination(destination); + jmsStreamer.setDestination(dest); jmsStreamer.setThreads(5); // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT @@ -433,14 +482,14 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { // start the streamer jmsStreamer.start(); - DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); assertEquals(5, qStats.getConsumers().getCount()); // all cache PUT events received in 10 seconds latch.await(10, TimeUnit.SECONDS); // assert that all consumers received messages - given that the prefetch is 1 - for (Subscription subscription : broker.getBroker().getDestinationMap().get(destination).getConsumers()) + for (Subscription subscription : broker.getBroker().getDestinationMap().get(dest).getConsumers()) assertTrue(subscription.getDequeueCounter() > 0); assertAllCacheEntriesLoaded(); @@ -450,6 +499,9 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } + /** + * + */ private void assertAllCacheEntriesLoaded() { // Get the cache and check that the entries are present IgniteCache cache = grid().cache(null); @@ -464,39 +516,44 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { JmsStreamer jmsStreamer = new JmsStreamer<>(); jmsStreamer.setIgnite(grid()); jmsStreamer.setStreamer(dataStreamer); - jmsStreamer.setConnectionFactory(connectionFactory); + jmsStreamer.setConnectionFactory(connFactory); - if (type == ObjectMessage.class) { + if (type == ObjectMessage.class) jmsStreamer.setTransformer((MessageTransformer) TestTransformers.forObjectMessage()); - } - else { + else jmsStreamer.setTransformer((MessageTransformer) TestTransformers.forTextMessage()); - } dataStreamer.allowOverwrite(true); dataStreamer.autoFlushFrequency(10); return jmsStreamer; } + /** + * @param expect Expected events number. + * @return Event receive latch. + */ private CountDownLatch subscribeToPutEvents(int expect) { Ignite ignite = grid(); // Listen to cache PUT events and expect as many as messages as test data items final CountDownLatch latch = new CountDownLatch(expect); - @SuppressWarnings("serial") IgniteBiPredicate callback = new IgniteBiPredicate() { + + @SuppressWarnings("serial") IgniteBiPredicate cb = new IgniteBiPredicate() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); return true; } }; - ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(cb, null, EVT_CACHE_OBJECT_PUT); return latch; } - private void produceObjectMessages(Destination destination, boolean singleMessage) throws JMSException { - Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer mp = session.createProducer(destination); + private void produceObjectMessages(Destination dest, boolean singleMsg) throws JMSException { + Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer mp = ses.createProducer(dest); + HashSet set = new HashSet<>(); for (String key : TEST_DATA.keySet()) { @@ -505,20 +562,21 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } int messagesSent; - if (singleMessage) { - mp.send(session.createObjectMessage(set)); + + if (singleMsg) { + mp.send(ses.createObjectMessage(set)); messagesSent = 1; } else { for (TestTransformers.TestObject to : set) - mp.send(session.createObjectMessage(to)); + mp.send(ses.createObjectMessage(to)); messagesSent = set.size(); } - if (destination instanceof Queue) { + if (dest instanceof Queue) { try { - assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(destination) + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest) .getDestinationStatistics().getMessages().getCount()); } catch (Exception e) { @@ -528,36 +586,39 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } - private void produceStringMessages(Destination destination, boolean singleMessage) throws JMSException { - Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer mp = session.createProducer(destination); + private void produceStringMessages(Destination dest, boolean singleMsg) throws JMSException { + Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer mp = ses.createProducer(dest); + HashSet set = new HashSet<>(); for (String key : TEST_DATA.keySet()) set.add(key + "," + TEST_DATA.get(key)); int messagesSent; - if (singleMessage) { + + if (singleMsg) { StringBuilder sb = new StringBuilder(); for (String s : set) sb.append(s).append("|"); sb.deleteCharAt(sb.length() - 1); - mp.send(session.createTextMessage(sb.toString())); + mp.send(ses.createTextMessage(sb.toString())); messagesSent = 1; } else { - for (String s : set) { - mp.send(session.createTextMessage(s)); - } + for (String s : set) + mp.send(ses.createTextMessage(s)); + messagesSent = set.size(); } - if (destination instanceof Queue) { + if (dest instanceof Queue) { try { - assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(destination) + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest) .getDestinationStatistics().getMessages().getCount()); } catch (Exception e) { @@ -566,5 +627,4 @@ public class IgniteJmsStreamerTest extends GridCommonAbstractTest { } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ec9ddcd3/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java index e299f04..071ff9b 100644 --- a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java +++ b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java @@ -30,5 +30,5 @@ import org.junit.runners.*; IgniteJmsStreamerTest.class }) public class IgniteJmsStreamerTestSuite { - + // No-op. } \ No newline at end of file