ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject [04/29] ignite git commit: IGNITE-1790 Implement Apache Camel streamer.
Date Fri, 20 Nov 2015 08:15:37 GMT
IGNITE-1790 Implement Apache Camel streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c490de38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c490de38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c490de38

Branch: refs/heads/ignite-801
Commit: c490de38c7b841fe51fec1001368bda096e82100
Parents: 175b7f2
Author: Raul Kripalani <raulk@apache.org>
Authored: Wed Nov 18 18:03:23 2015 +0000
Committer: Raul Kripalani <raulk@apache.org>
Committed: Wed Nov 18 18:03:23 2015 +0000

----------------------------------------------------------------------
 modules/camel/pom.xml                           | 102 +++++
 .../ignite/stream/camel/CamelStreamer.java      | 237 +++++++++++
 .../stream/camel/IgniteCamelStreamerTest.java   | 420 +++++++++++++++++++
 .../camel/IgniteCamelStreamerTestSuite.java     |  48 +++
 .../src/test/resources/camel.test.properties    |  18 +
 .../org/apache/ignite/stream/StreamAdapter.java |  19 +-
 pom.xml                                         |   1 +
 7 files changed, 835 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/pom.xml
----------------------------------------------------------------------
diff --git a/modules/camel/pom.xml b/modules/camel/pom.xml
new file mode 100644
index 0000000..60f0597
--- /dev/null
+++ b/modules/camel/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-camel</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <camel.version>2.16.0</camel.version>
+        <guava.version>18.0</guava.version>
+        <okhttp.version>2.5.0</okhttp.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <version>${camel.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jetty</artifactId>
+            <version>${camel.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
new file mode 100644
index 0000000..40ed6b3
--- /dev/null
+++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+/**
+ * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them
into an Ignite data streamer.
+ *
+ * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple
extractor (either {@link
+ * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
+ *
+ * The user can also provide a custom {@link CamelContext} in case they want to attach custom
components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies,
etc.
+ *
+ * @see <a href="http://camel.apache.org">Apache Camel</a>
+ * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
+ */
+public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements
Processor {
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** The Camel Context. */
+    private CamelContext camelCtx;
+
+    /** The endpoint URI to consume from. */
+    private String endpointUri;
+
+    /** Camel endpoint. */
+    private Endpoint endpoint;
+
+    /** Camel consumer. */
+    private Consumer consumer;
+
+    /** A {@link Processor} to generate the response. */
+    private Processor resProc;
+
+    /**
+     * Starts the streamer.
+     *
+     * @throws IgniteException In cases when failed to start the streamer.
+     */
+    public void start() throws IgniteException {
+        // Ensure that the endpoint URI is provided.
+        A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");
+
+        // Check that one and only one tuple extractor is provided.
+        A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor()
== null),
+            "tuple extractor missing");
+
+        // If a custom CamelContext is not provided, initialize one.
+        if (camelCtx == null)
+            camelCtx = new DefaultCamelContext();
+
+        // If the Camel Context is starting or started, reject this call to start.
+        if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
+            throw new IgniteException("Failed to start Camel streamer (CamelContext already
started or starting).");
+
+        log = getIgnite().log();
+
+        // Instantiate the Camel endpoint.
+        try {
+            endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage()
+ ']');
+        }
+
+        // Create the Camel consumer.
+        try {
+            consumer = endpoint.createConsumer(this);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage()
+ ']');
+        }
+
+        // Start the Camel services.
+        try {
+            ServiceHelper.startServices(camelCtx, endpoint, consumer);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            try {
+                ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+
+                consumer = null;
+                endpoint = null;
+            }
+            catch (Exception e1) {
+                throw new IgniteException("Failed to start Camel streamer; failed to stop
the context, endpoint or " +
+                    "consumer during rollback of failed initialization [errMsg=" + e.getMessage()
+ ", stopErrMsg=" +
+                    e1.getMessage() + ']');
+            }
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage()
+ ']');
+        }
+
+        U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
+    }
+
+    /**
+     * Stops the streamer.
+     *
+     * @throws IgniteException In cases if failed to stop the streamer.
+     */
+    public void stop() throws IgniteException {
+        // If the Camel Context is stopping or stopped, reject this call to stop.
+        if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
+            throw new IgniteException("Failed to stop Camel streamer (CamelContext already
stopped or stopping).");
+
+        // Stop Camel services.
+        try {
+            ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+        }
+        catch (Exception e) {
+            throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage()
+ ']');
+        }
+
+        U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
+    }
+
+    /**
+     * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
+     *
+     * @param exchange The Camel Exchange.
+     */
+    @Override public void process(Exchange exchange) throws Exception {
+        // Extract and insert the tuple(s).
+        if (getMultipleTupleExtractor() == null) {
+            Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
+            getStreamer().addData(entry);
+        }
+        else {
+            Map<K, V> entries = getMultipleTupleExtractor().extract(exchange);
+            getStreamer().addData(entries);
+        }
+
+        // If the user has set a response processor, invoke it before finishing.
+        if (resProc != null)
+            resProc.process(exchange);
+    }
+
+    /**
+     * Gets the underlying {@link CamelContext}, whether created automatically by Ignite
or the context specified by the
+     * user.
+     *
+     * @return The Camel Context.
+     */
+    public CamelContext getCamelContext() {
+        return camelCtx;
+    }
+
+    /**
+     * Explicitly sets the {@link CamelContext} to use.
+     *
+     * Doing so gives the user the opportunity to attach custom components, a {@link
+     * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management
strategies, etc.
+     *
+     * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
+     */
+    public void setCamelContext(CamelContext camelCtx) {
+        this.camelCtx = camelCtx;
+    }
+
+    /**
+     * Gets the endpoint URI from which to consume.
+     *
+     * @return The endpoint URI.
+     */
+    public String getEndpointUri() {
+        return endpointUri;
+    }
+
+    /**
+     * Sets the endpoint URI from which to consume. <b>Mandatory.</b>
+     *
+     * @param endpointUri The endpoint URI.
+     */
+    public void setEndpointUri(String endpointUri) {
+        this.endpointUri = endpointUri;
+    }
+
+    /**
+     * Gets the {@link Processor} used to generate the response.
+     *
+     * @return The {@link Processor}.
+     */
+    public Processor getResponseProcessor() {
+        return resProc;
+    }
+
+    /**
+     * Sets the {@link Processor} used to generate the response.
+     *
+     * @param resProc The {@link Processor}.
+     */
+    public void setResponseProcessor(Processor resProc) {
+        this.resProc = resProc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
new file mode 100644
index 0000000..4795dff
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Test class for {@link CamelStreamer}.
+ */
+public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
+    /** text/plain media type. */
+    private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
+
+    /** The test data. */
+    private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+    /** The Camel streamer currently under test. */
+    private CamelStreamer<Integer, String> streamer;
+
+    /** The Ignite data streamer. */
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
+    /** URL where the REST service will be exposed. */
+    private String url;
+
+    /** The UUID of the currently active remote listener. */
+    private UUID remoteLsnr;
+
+    /** The OkHttpClient. */
+    private OkHttpClient httpClient = new OkHttpClient();
+
+    // Initialize the test data.
+    static {
+        for (int i = 0; i < 100; i++)
+            TEST_DATA.put(i, "v" + i);
+    }
+
+    /** Constructor. */
+    public IgniteCamelStreamerTest() {
+        super(true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override public void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+        // find an available local port
+        try (ServerSocket ss = new ServerSocket(0)) {
+            int port = ss.getLocalPort();
+
+            url = "http://localhost:" + port + "/ignite";
+        }
+
+        // create Camel streamer
+        dataStreamer = grid().dataStreamer(null);
+        streamer = createCamelStreamer(dataStreamer);
+    }
+
+    @Override public void afterTest() throws Exception {
+        try {
+            streamer.stop();
+        }
+        catch (Exception e) {
+            // ignore if already stopped
+        }
+
+        dataStreamer.close();
+
+        grid().cache(null).clear();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testSendOneEntryPerMessage() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMultipleEntriesInOneMessage() throws Exception {
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, true);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testResponseProcessorIsCalled() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setResponseProcessor(new Processor() {
+            @Override public void process(Exchange exchange) throws Exception {
+                exchange.getOut().setBody("Foo bar");
+            }
+        });
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        List<String> responses = sendMessages(0, 50, false);
+
+        for (String r : responses)
+            assertEquals("Foo bar", r);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testUserSpecifiedCamelContext() throws Exception {
+        final AtomicInteger cnt = new AtomicInteger();
+
+        // Create a CamelContext with a probe that'll help us know if it has been used.
+        CamelContext context = new DefaultCamelContext();
+        context.setTracing(true);
+        context.addLifecycleStrategy(new LifecycleStrategySupport() {
+            @Override public void onEndpointAdd(Endpoint endpoint) {
+                cnt.incrementAndGet();
+            }
+        });
+
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setCamelContext(context);
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+        assertTrue(cnt.get() > 0);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception
{
+        // Create a CamelContext with a custom property placeholder.
+        CamelContext context = new DefaultCamelContext();
+
+        PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
+
+        context.addComponent("properties", pc);
+
+        // Replace the context path in the test URL with the property placeholder.
+        url = url.replaceAll("/ignite", "{{test.contextPath}}");
+
+        // Recreate the Camel streamer with the new URL.
+        streamer = createCamelStreamer(dataStreamer);
+
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setCamelContext(context);
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Before sending the messages, get the actual URL after the property placeholder
was resolved,
+        // stripping the jetty: prefix from it.
+        url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:",
"");
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testInvalidEndpointUri() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setEndpointUri("abc");
+
+        // Action time.
+        try {
+            streamer.start();
+            fail("Streamer started; should have failed.");
+        }
+        catch (IgniteException e) {
+            assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
+            assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
+        }
+    }
+
+    /**
+     * Creates a Camel streamer.
+     */
+    private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer,
String> dataStreamer) {
+        CamelStreamer<Integer, String> streamer = new CamelStreamer<>();
+
+        streamer.setIgnite(grid());
+        streamer.setStreamer(dataStreamer);
+        streamer.setEndpointUri("jetty:" + url);
+
+        dataStreamer.allowOverwrite(true);
+        dataStreamer.autoFlushFrequency(1);
+
+        return streamer;
+    }
+
+    /**
+     * @throws IOException
+     * @return HTTP response payloads.
+     */
+    private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage)
throws IOException {
+        List<String> responses = Lists.newArrayList();
+
+        if (singleMessage) {
+            StringBuilder sb = new StringBuilder();
+
+            for (int i = fromIdx; i < fromIdx + cnt; i++)
+                sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
+
+            Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
+                .build();
+
+            Response response = httpClient.newCall(request).execute();
+
+            responses.add(response.body().string());
+        }
+        else {
+            for (int i = fromIdx; i < fromIdx + cnt; i++) {
+                String payload = i + "," + TEST_DATA.get(i);
+
+                Request request = new Request.Builder()
+                    .url(url)
+                    .post(RequestBody.create(TEXT_PLAIN, payload))
+                    .build();
+
+                Response response = httpClient.newCall(request).execute();
+
+                responses.add(response.body().string());
+            }
+        }
+
+        return responses;
+    }
+
+    /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     */
+    private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor()
{
+        return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
+            @Override public Map.Entry<Integer, String> extract(Exchange exchange)
{
+                List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
+
+                return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+            }
+        };
+    }
+
+    /**
+     * Returns a {@link StreamMultipleTupleExtractor} for testing.
+     */
+    private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor()
{
+        return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
+            @Override public Map<Integer, String> extract(Exchange exchange) {
+                final Map<String, String> map = Splitter.on("\n")
+                    .omitEmptyStrings()
+                    .withKeyValueSeparator(",")
+                    .split(exchange.getIn().getBody(String.class));
+
+                final Map<Integer, String> answer = new HashMap<>();
+
+                F.forEach(map.keySet(), new IgniteInClosure<String>() {
+                    @Override public void apply(String s) {
+                        answer.put(Integer.parseInt(s), map.get(s));
+                    }
+                });
+
+                return answer;
+            }
+        };
+    }
+
+    /**
+     * Subscribe to cache put events.
+     */
+    private CountDownLatch subscribeToPutEvents(int expect) {
+        Ignite ignite = grid();
+
+        // Listen to cache PUT events and expect as many as messages as test data items
+        final CountDownLatch latch = new CountDownLatch(expect);
+        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
+            new IgniteBiPredicate<UUID, CacheEvent>() {
+            @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
+            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+
+        return latch;
+    }
+
+    /**
+     * Assert a given number of cache entries have been loaded.
+     */
+    private void assertCacheEntriesLoaded(int cnt) {
+        // get the cache and check that the entries are present
+        IgniteCache<Integer, String> cache = grid().cache(null);
+
+        // for each key from 0 to count from the TEST_DATA (ordered by key), check that the
entry is present in cache
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0,
cnt))
+            assertEquals(TEST_DATA.get(key), cache.get(key));
+
+        // assert that the cache exactly the specified amount of elements
+        assertEquals(cnt, cache.size(CachePeekMode.ALL));
+
+        // remove the event listener
+        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
new file mode 100644
index 0000000..266c9cf
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Set;
+
+import junit.framework.TestSuite;
+
+/**
+ * Camel streamer tests.
+ */
+public class IgniteCamelStreamerTestSuite extends TestSuite {
+    /**
+     * @return {@link IgniteCamelStreamerTest} test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        return suite(null);
+    }
+
+    /**
+     * @param ignoredTests
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
+        TestSuite suite = new TestSuite("IgniteCamelStreamer Test Suite");
+
+        suite.addTestSuite(IgniteCamelStreamerTest.class);
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/resources/camel.test.properties
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties
new file mode 100644
index 0000000..30459be
--- /dev/null
+++ b/modules/camel/src/test/resources/camel.test.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+test.contextPath = /ignite-properties

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 2cb7db7..afc1530 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-
     /** Tuple extractor extracting a single tuple from an event */
     private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
@@ -99,9 +98,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+        if (singleTupleExtractor instanceof StreamTupleExtractor)
             return (StreamTupleExtractor) singleTupleExtractor;
-        }
+
         throw new IllegalArgumentException("This method is deprecated and only relevant if
using an old " +
             "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
@@ -112,9 +111,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot
set both types at once.");
-        }
+
         this.singleTupleExtractor = extractor;
     }
 
@@ -129,9 +128,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param singleTupleExtractor Extractor for key-value tuples from messages.
      */
     public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor)
{
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot
set both types at once.");
-        }
+
         this.singleTupleExtractor = singleTupleExtractor;
     }
 
@@ -146,9 +145,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor)
{
-        if (singleTupleExtractor != null) {
+        if (singleTupleExtractor != null)
             throw new IllegalArgumentException("Single tuple extractor already set; cannot
set both types at once.");
-        }
+
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -188,4 +187,4 @@ public abstract class StreamAdapter<T, K, V> {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c40b551..b9c51b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
         <module>modules/jms11</module>
         <module>modules/mqtt</module>
         <module>modules/zookeeper</module>
+        <module>modules/camel</module>
         <module>modules/platform</module>
     </modules>
 


Mime
View raw message