ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/51] [abbrv] ignite git commit: IGNITE-4539 - Added RocketMQ integration
Date Wed, 26 Apr 2017 17:38:26 GMT
IGNITE-4539 - Added RocketMQ integration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/335f2431
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/335f2431
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/335f2431

Branch: refs/heads/ignite-5075-cacheStart
Commit: 335f24317da9f0083834bb9e34622b85d282f42b
Parents: ee1b19d
Author: Roman Shtykh <rshtykh@yahoo.com>
Authored: Wed Apr 26 14:24:20 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Apr 26 14:25:12 2017 +0300

----------------------------------------------------------------------
 modules/rocketmq/README.txt                     |  25 +++
 modules/rocketmq/pom.xml                        |  81 +++++++
 .../stream/rocketmq/RocketMQStreamer.java       | 151 +++++++++++++
 .../ignite/stream/rocketmq/package-info.java    |  21 ++
 .../stream/rocketmq/RocketMQStreamerTest.java   | 214 +++++++++++++++++++
 .../rocketmq/RocketMQStreamerTestSuite.java     |  37 ++++
 .../stream/rocketmq/TestRocketMQServer.java     | 148 +++++++++++++
 .../ignite/stream/rocketmq/package-info.java    |  21 ++
 parent/pom.xml                                  |   5 +
 pom.xml                                         |   1 +
 10 files changed, 704 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/README.txt
----------------------------------------------------------------------
diff --git a/modules/rocketmq/README.txt b/modules/rocketmq/README.txt
new file mode 100644
index 0000000..55a117b
--- /dev/null
+++ b/modules/rocketmq/README.txt
@@ -0,0 +1,25 @@
+Apache Ignite RocketMQ Streamer Module
+--------------------------------------
+
+Apache Ignite RocketMQ Streamer module provides streaming from RocketMQ to Ignite cache.
+
+To use Ignite RocketMQ Streamer module, first import it to your Maven project.
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-rocketmq</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+
+Then, initialize and start it as, for instance, done in RocketMQStreamerTest.java.

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml
new file mode 100644
index 0000000..3b317fa
--- /dev/null
+++ b/modules/rocketmq/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-rocketmq</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
new file mode 100644
index 0000000..67f1ce5
--- /dev/null
+++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.List;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * Streamer that subscribes to a RocketMQ topic amd feeds messages into {@link IgniteDataStreamer}
instance.
+ */
+public class RocketMQStreamer<K, V> extends StreamAdapter<List<MessageExt>,
K, V> implements MessageListenerConcurrently {
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** RocketMQ consumer. */
+    private DefaultMQPushConsumer consumer;
+
+    /** State. */
+    private volatile boolean stopped = true;
+
+    /** Topic to subscribe to. */
+    private String topic;
+
+    /** Consumer group. */
+    private String consumerGrp;
+
+    /** Name server address. */
+    private String nameSrvAddr;
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        if (!stopped)
+            throw new IgniteException("Attempted to start an already started RocketMQ streamer");
+
+        // validate parameters.
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.notNull(topic, "topic");
+        A.notNull(consumerGrp, "consumer group");
+        A.notNullOrEmpty(nameSrvAddr, "nameserver address");
+        A.ensure(null != getMultipleTupleExtractor(), "Multiple tuple extractor must be configured");
+
+        log = getIgnite().log();
+
+        consumer = new DefaultMQPushConsumer(consumerGrp);
+
+        consumer.setNamesrvAddr(nameSrvAddr);
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+        try {
+            consumer.subscribe(topic, "*");
+        }
+        catch (MQClientException e) {
+            throw new IgniteException("Failed to subscribe to " + topic, e);
+        }
+
+        consumer.registerMessageListener(this);
+
+        try {
+            consumer.start();
+        }
+        catch (MQClientException e) {
+            throw new IgniteException("Failed to start the streamer", e);
+        }
+
+        stopped = false;
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        if (consumer != null)
+            consumer.shutdown();
+
+        stopped = true;
+    }
+
+    /**
+     * Implements {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)}
to receive
+     * messages.
+     *
+     * {@inheritDoc}
+     */
+    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+        ConsumeConcurrentlyContext context) {
+        if (log.isDebugEnabled())
+            log.debug("Received " + msgs.size() + " messages");
+
+        addMessage(msgs);
+
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+
+    /**
+     * Sets the topic to subscribe to.
+     *
+     * @param topic The topic to subscribe to.
+     */
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * Sets the name of the consumer group.
+     *
+     * @param consumerGrp Consumer group name.
+     */
+    public void setConsumerGrp(String consumerGrp) {
+        this.consumerGrp = consumerGrp;
+    }
+
+    /**
+     * Sets the name server address.
+     *
+     * @param nameSrvAddr Name server address
+     */
+    public void setNameSrvAddr(String nameSrvAddr) {
+        this.nameSrvAddr = nameSrvAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
new file mode 100644
index 0000000..f743696
--- /dev/null
+++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
@@ -0,0 +1,21 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Contains implementation of RocketMQStreamer tests.
+ */
+package org.apache.ignite.stream.rocketmq;

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
new file mode 100644
index 0000000..59451e9
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
@@ -0,0 +1,214 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT;
+import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP;
+
+/**
+ * Test for {@link RocketMQStreamer}.
+ */
+public class RocketMQStreamerTest extends GridCommonAbstractTest {
+    /** Test topic. */
+    private static final String TOPIC_NAME = "testTopic";
+
+    /** Test consumer group. */
+    private static final String CONSUMER_GRP = "testConsumerGrp";
+
+    /** Test server. */
+    private static TestRocketMQServer testRocketMQServer;
+
+    /** Number of events to handle. */
+    private static final int EVT_NUM = 1000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid().cache(null).clear();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        testRocketMQServer = new TestRocketMQServer(log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        if (testRocketMQServer != null)
+            testRocketMQServer.shutdown();
+    }
+
+    /** Constructor. */
+    public RocketMQStreamerTest() {
+        super(true);
+    }
+
+    /**
+     * Tests data is properly injected into the grid.
+     *
+     * @throws Exception If fails.
+     */
+    public void testStreamer() throws Exception {
+        RocketMQStreamer<String, byte[]> streamer = null;
+
+        Ignite ignite = grid();
+
+        try (IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(null))
{
+            dataStreamer.allowOverwrite(true);
+            dataStreamer.autoFlushFrequency(10);
+
+            streamer = new RocketMQStreamer<>();
+
+            //configure.
+            streamer.setIgnite(ignite);
+            streamer.setStreamer(dataStreamer);
+            streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+            streamer.setConsumerGrp(CONSUMER_GRP);
+            streamer.setTopic(TOPIC_NAME);
+            streamer.setMultipleTupleExtractor(new TestTupleExtractor());
+
+            streamer.start();
+
+            IgniteCache<String, String> cache = ignite.cache(null);
+
+            assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+            final CountDownLatch latch = new CountDownLatch(EVT_NUM);
+
+            IgniteBiPredicate<UUID, CacheEvent> putLsnr = new IgniteBiPredicate<UUID,
CacheEvent>() {
+                @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                    assert evt != null;
+
+                    latch.countDown();
+
+                    return true;
+                }
+            };
+
+            ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(putLsnr, null,
EVT_CACHE_OBJECT_PUT);
+
+            produceData();
+
+            assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+            assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY));
+        }
+        finally {
+            if (streamer != null)
+                streamer.stop();
+        }
+    }
+
+    /**
+     * Test tuple extractor.
+     */
+    public static class TestTupleExtractor implements StreamMultipleTupleExtractor<List<MessageExt>,
String, byte[]> {
+
+        /** {@inheritDoc} */
+        @Override public Map<String, byte[]> extract(List<MessageExt> msgs) {
+            final Map<String, byte[]> map = new HashMap<>();
+
+            for (MessageExt msg : msgs)
+                map.put(msg.getMsgId(), msg.getBody());
+
+            return map;
+        }
+    }
+
+    /**
+     * Adds data to RocketMQ.
+     *
+     * @throws Exception If fails.
+     */
+    private void produceData() throws Exception {
+        initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT);
+
+        DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp");
+
+        producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+
+        try {
+            producer.start();
+
+            for (int i = 0; i < EVT_NUM; i++)
+                producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8")));
+        }
+        catch (Exception e) {
+            throw new Exception(e);
+        }
+        finally {
+            producer.shutdown();
+        }
+    }
+
+    /**
+     * Initializes RocketMQ topic.
+     *
+     * @param topic Topic.
+     * @param nsAddr Nameserver address.
+     * @throws IgniteInterruptedCheckedException If fails.
+     */
+    private void initTopic(String topic, String nsAddr) throws Exception {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExt.setNamesrvAddr(nsAddr);
+        try {
+            defaultMQAdminExt.start();
+
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setTopicName(topic);
+            topicConfig.setReadQueueNums(4);
+            topicConfig.setWriteQueueNums(4);
+
+            defaultMQAdminExt.createAndUpdateTopicConfig(testRocketMQServer.getBrokerAddr(),
topicConfig);
+
+            U.sleep(100);
+        }
+        finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
new file mode 100644
index 0000000..e761f1b
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import junit.framework.TestSuite;
+
+/**
+ * Apache RocketMQ streamers tests.
+ */
+public class RocketMQStreamerTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
+
+        suite.addTest(new TestSuite(RocketMQStreamerTest.class));
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
new file mode 100644
index 0000000..beece8e
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import static java.io.File.separator;
+
+/**
+ * Test RocketMQ server handling a broker and a nameserver.
+ */
+class TestRocketMQServer {
+    /** Nameserver port. */
+    protected static final int NAME_SERVER_PORT = 9000;
+
+    /** Broker port. */
+    private static final int BROKER_PORT = 8000;
+
+    /** Broker HA port. */
+    private static final int HA_PORT = 8001;
+
+    /** Test ip address. */
+    protected static final String TEST_IP = "127.0.0.1";
+
+    /** Test broker name. */
+    private static final String TEST_BROKER = "testBroker";
+
+    /** Test cluster name. */
+    private static final String TEST_CLUSTER = "testCluster";
+
+    /** Nameserver. */
+    private static NamesrvController nameSrv;
+
+    /** Broker. */
+    private static BrokerController broker;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * Test server constructor.
+     *
+     * @param log Logger.
+     */
+    TestRocketMQServer(IgniteLogger log) {
+        this.log = log;
+
+        try {
+            startNameServer();
+            startBroker();
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to start RocketMQ: " + e);
+        }
+    }
+
+    /**
+     * Starts a test nameserver.
+     *
+     * @throws Exception If fails.
+     */
+    private void startNameServer() throws Exception {
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+
+        namesrvConfig.setKvConfigPath(System.getProperty("java.io.tmpdir") + separator +
"namesrv" + separator + "kvConfig.json");
+        nameServerNettyServerConfig.setListenPort(NAME_SERVER_PORT);
+
+        nameSrv = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+
+        nameSrv.initialize();
+        nameSrv.start();
+
+        log.info("Started nameserver at " + NAME_SERVER_PORT);
+    }
+
+    /**
+     * Starts a test broker.
+     *
+     * @throws Exception If fails.
+     */
+    private void startBroker() throws Exception {
+        BrokerConfig brokerCfg = new BrokerConfig();
+        NettyServerConfig nettySrvCfg = new NettyServerConfig();
+        MessageStoreConfig storeCfg = new MessageStoreConfig();
+
+        brokerCfg.setBrokerName(TEST_BROKER);
+        brokerCfg.setBrokerClusterName(TEST_CLUSTER);
+        brokerCfg.setBrokerIP1(TEST_IP);
+        brokerCfg.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+
+        storeCfg.setStorePathRootDir(System.getProperty("java.io.tmpdir") + separator + "store-"
+ UUID.randomUUID());
+        storeCfg.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + separator +
"commitlog");
+        storeCfg.setHaListenPort(HA_PORT);
+
+        nettySrvCfg.setListenPort(BROKER_PORT);
+
+        broker = new BrokerController(brokerCfg, nettySrvCfg, new NettyClientConfig(), storeCfg);
+
+        broker.initialize();
+        broker.start();
+
+        log.info("Started broker [" + TEST_BROKER + "] at " + BROKER_PORT);
+    }
+
+    /**
+     * Obtains the broker address.
+     *
+     * @return Broker address.
+     */
+    String getBrokerAddr() {
+        return broker.getBrokerAddr();
+    }
+
+    /**
+     * Shuts test server down.
+     */
+    void shutdown() {
+        if (broker != null)
+            broker.shutdown();
+
+        if (nameSrv != null)
+            nameSrv.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
new file mode 100644
index 0000000..eebf084
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
@@ -0,0 +1,21 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Contains implementation of RocketMQStreamer.
+ */
+package org.apache.ignite.stream.rocketmq;

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3d0f413..a59d8c9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -94,6 +94,7 @@
         <osgi.core.version>5.0.0</osgi.core.version>
         <osgi.enterprise.version>5.0.0</osgi.enterprise.version>
         <paho.version>1.0.2</paho.version>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
         <scala210.jline.version>2.10.4</scala210.jline.version>
         <scala210.library.version>2.10.4</scala210.library.version>
         <scala211.library.version>2.11.7</scala211.library.version>
@@ -449,6 +450,10 @@
                                 <title>SpringData integration</title>
                                 <packages>org.apache.ignite.springdata.repository*</packages>
                             </group>
+                            <group>
+                                <title>RocketMQ integration</title>
+                                <packages>org.apache.ignite.rocketmq*</packages>
+                            </group>
                         </groups>
                         <header>
                             <![CDATA[

http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a05342..9c6a0d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
         <module>modules/kubernetes</module>
         <module>modules/zeromq</module>
         <module>modules/hibernate-core</module>
+        <module>modules/rocketmq</module>
     </modules>
 
     <profiles>


Mime
View raw message