Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3581718A89 for ; Wed, 18 Nov 2015 20:22:00 +0000 (UTC) Received: (qmail 99325 invoked by uid 500); 18 Nov 2015 20:22:00 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 99199 invoked by uid 500); 18 Nov 2015 20:21:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 99097 invoked by uid 99); 18 Nov 2015 20:21:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 20:21:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F802E0AD4; Wed, 18 Nov 2015 20:21:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 18 Nov 2015 20:22:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] ignite git commit: IGNITE-1790 Implement Apache Camel streamer. IGNITE-1790 Implement Apache Camel streamer. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c490de38 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c490de38 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c490de38 Branch: refs/heads/ignite-single-op-get Commit: c490de38c7b841fe51fec1001368bda096e82100 Parents: 175b7f2 Author: Raul Kripalani Authored: Wed Nov 18 18:03:23 2015 +0000 Committer: Raul Kripalani Committed: Wed Nov 18 18:03:23 2015 +0000 ---------------------------------------------------------------------- modules/camel/pom.xml | 102 +++++ .../ignite/stream/camel/CamelStreamer.java | 237 +++++++++++ .../stream/camel/IgniteCamelStreamerTest.java | 420 +++++++++++++++++++ .../camel/IgniteCamelStreamerTestSuite.java | 48 +++ .../src/test/resources/camel.test.properties | 18 + .../org/apache/ignite/stream/StreamAdapter.java | 19 +- pom.xml | 1 + 7 files changed, 835 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/pom.xml ---------------------------------------------------------------------- diff --git a/modules/camel/pom.xml b/modules/camel/pom.xml new file mode 100644 index 0000000..60f0597 --- /dev/null +++ b/modules/camel/pom.xml @@ -0,0 +1,102 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-camel + 1.5.0-SNAPSHOT + http://ignite.apache.org + + + 2.16.0 + 18.0 + 2.5.0 + + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.apache.camel + camel-core + ${camel.version} + + + + org.apache.ignite + ignite-log4j + ${project.version} + test + + + + org.apache.ignite + ignite-spring + ${project.version} + test + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + com.google.guava + guava + ${guava.version} + test + + + + org.apache.camel + camel-jetty + ${camel.version} + test + + + + com.squareup.okhttp + okhttp + ${okhttp.version} + test + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java new file mode 100644 index 0000000..40ed6b3 --- /dev/null +++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.camel; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ServiceStatus; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.ServiceHelper; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.stream.StreamAdapter; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; + +/** + * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer. + * + * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link + * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}. + * + * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link + * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc. + * + * @see Apache Camel + * @see Apache Camel components + */ +public class CamelStreamer extends StreamAdapter implements Processor { + /** Logger. */ + private IgniteLogger log; + + /** The Camel Context. */ + private CamelContext camelCtx; + + /** The endpoint URI to consume from. */ + private String endpointUri; + + /** Camel endpoint. */ + private Endpoint endpoint; + + /** Camel consumer. */ + private Consumer consumer; + + /** A {@link Processor} to generate the response. */ + private Processor resProc; + + /** + * Starts the streamer. + * + * @throws IgniteException In cases when failed to start the streamer. + */ + public void start() throws IgniteException { + // Ensure that the endpoint URI is provided. + A.notNullOrEmpty(endpointUri, "endpoint URI must be provided"); + + // Check that one and only one tuple extractor is provided. + A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), + "tuple extractor missing"); + + // If a custom CamelContext is not provided, initialize one. + if (camelCtx == null) + camelCtx = new DefaultCamelContext(); + + // If the Camel Context is starting or started, reject this call to start. + if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting) + throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting)."); + + log = getIgnite().log(); + + // Instantiate the Camel endpoint. + try { + endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri); + } + catch (Exception e) { + U.error(log, e); + + throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']'); + } + + // Create the Camel consumer. + try { + consumer = endpoint.createConsumer(this); + } + catch (Exception e) { + U.error(log, e); + + throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']'); + } + + // Start the Camel services. + try { + ServiceHelper.startServices(camelCtx, endpoint, consumer); + } + catch (Exception e) { + U.error(log, e); + + try { + ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer); + + consumer = null; + endpoint = null; + } + catch (Exception e1) { + throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " + + "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" + + e1.getMessage() + ']'); + } + + throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']'); + } + + U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri); + } + + /** + * Stops the streamer. + * + * @throws IgniteException In cases if failed to stop the streamer. + */ + public void stop() throws IgniteException { + // If the Camel Context is stopping or stopped, reject this call to stop. + if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping) + throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping)."); + + // Stop Camel services. + try { + ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer); + } + catch (Exception e) { + throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']'); + } + + U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri); + } + + /** + * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer. + * + * @param exchange The Camel Exchange. + */ + @Override public void process(Exchange exchange) throws Exception { + // Extract and insert the tuple(s). + if (getMultipleTupleExtractor() == null) { + Map.Entry entry = getSingleTupleExtractor().extract(exchange); + getStreamer().addData(entry); + } + else { + Map entries = getMultipleTupleExtractor().extract(exchange); + getStreamer().addData(entries); + } + + // If the user has set a response processor, invoke it before finishing. + if (resProc != null) + resProc.process(exchange); + } + + /** + * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the + * user. + * + * @return The Camel Context. + */ + public CamelContext getCamelContext() { + return camelCtx; + } + + /** + * Explicitly sets the {@link CamelContext} to use. + * + * Doing so gives the user the opportunity to attach custom components, a {@link + * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc. + * + * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}. + */ + public void setCamelContext(CamelContext camelCtx) { + this.camelCtx = camelCtx; + } + + /** + * Gets the endpoint URI from which to consume. + * + * @return The endpoint URI. + */ + public String getEndpointUri() { + return endpointUri; + } + + /** + * Sets the endpoint URI from which to consume. Mandatory. + * + * @param endpointUri The endpoint URI. + */ + public void setEndpointUri(String endpointUri) { + this.endpointUri = endpointUri; + } + + /** + * Gets the {@link Processor} used to generate the response. + * + * @return The {@link Processor}. + */ + public Processor getResponseProcessor() { + return resProc; + } + + /** + * Sets the {@link Processor} used to generate the response. + * + * @param resProc The {@link Processor}. + */ + public void setResponseProcessor(Processor resProc) { + this.resProc = resProc; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java new file mode 100644 index 0000000..4795dff --- /dev/null +++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.camel; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ServiceStatus; +import org.apache.camel.component.properties.PropertiesComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.LifecycleStrategySupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.util.lang.GridMapEntry; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** + * Test class for {@link CamelStreamer}. + */ +public class IgniteCamelStreamerTest extends GridCommonAbstractTest { + /** text/plain media type. */ + private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8"); + + /** The test data. */ + private static final Map TEST_DATA = new HashMap<>(); + + /** The Camel streamer currently under test. */ + private CamelStreamer streamer; + + /** The Ignite data streamer. */ + private IgniteDataStreamer dataStreamer; + + /** URL where the REST service will be exposed. */ + private String url; + + /** The UUID of the currently active remote listener. */ + private UUID remoteLsnr; + + /** The OkHttpClient. */ + private OkHttpClient httpClient = new OkHttpClient(); + + // Initialize the test data. + static { + for (int i = 0; i < 100; i++) + TEST_DATA.put(i, "v" + i); + } + + /** Constructor. */ + public IgniteCamelStreamerTest() { + super(true); + } + + @SuppressWarnings("unchecked") + @Override public void beforeTest() throws Exception { + grid().getOrCreateCache(defaultCacheConfiguration()); + + // find an available local port + try (ServerSocket ss = new ServerSocket(0)) { + int port = ss.getLocalPort(); + + url = "http://localhost:" + port + "/ignite"; + } + + // create Camel streamer + dataStreamer = grid().dataStreamer(null); + streamer = createCamelStreamer(dataStreamer); + } + + @Override public void afterTest() throws Exception { + try { + streamer.stop(); + } + catch (Exception e) { + // ignore if already stopped + } + + dataStreamer.close(); + + grid().cache(null).clear(); + } + + /** + * @throws Exception + */ + public void testSendOneEntryPerMessage() throws Exception { + streamer.setSingleTupleExtractor(singleTupleExtractor()); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Send messages. + sendMessages(0, 50, false); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + /** + * @throws Exception + */ + public void testMultipleEntriesInOneMessage() throws Exception { + streamer.setMultipleTupleExtractor(multipleTupleExtractor()); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Send messages. + sendMessages(0, 50, true); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + /** + * @throws Exception + */ + public void testResponseProcessorIsCalled() throws Exception { + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setResponseProcessor(new Processor() { + @Override public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody("Foo bar"); + } + }); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Send messages. + List responses = sendMessages(0, 50, false); + + for (String r : responses) + assertEquals("Foo bar", r); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + /** + * @throws Exception + */ + public void testUserSpecifiedCamelContext() throws Exception { + final AtomicInteger cnt = new AtomicInteger(); + + // Create a CamelContext with a probe that'll help us know if it has been used. + CamelContext context = new DefaultCamelContext(); + context.setTracing(true); + context.addLifecycleStrategy(new LifecycleStrategySupport() { + @Override public void onEndpointAdd(Endpoint endpoint) { + cnt.incrementAndGet(); + } + }); + + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setCamelContext(context); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Send messages. + sendMessages(0, 50, false); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + assertTrue(cnt.get() > 0); + } + + /** + * @throws Exception + */ + public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception { + // Create a CamelContext with a custom property placeholder. + CamelContext context = new DefaultCamelContext(); + + PropertiesComponent pc = new PropertiesComponent("camel.test.properties"); + + context.addComponent("properties", pc); + + // Replace the context path in the test URL with the property placeholder. + url = url.replaceAll("/ignite", "{{test.contextPath}}"); + + // Recreate the Camel streamer with the new URL. + streamer = createCamelStreamer(dataStreamer); + + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setCamelContext(context); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Before sending the messages, get the actual URL after the property placeholder was resolved, + // stripping the jetty: prefix from it. + url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", ""); + + // Send messages. + sendMessages(0, 50, false); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + /** + * @throws Exception + */ + public void testInvalidEndpointUri() throws Exception { + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setEndpointUri("abc"); + + // Action time. + try { + streamer.start(); + fail("Streamer started; should have failed."); + } + catch (IgniteException e) { + assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped); + assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0); + } + } + + /** + * Creates a Camel streamer. + */ + private CamelStreamer createCamelStreamer(IgniteDataStreamer dataStreamer) { + CamelStreamer streamer = new CamelStreamer<>(); + + streamer.setIgnite(grid()); + streamer.setStreamer(dataStreamer); + streamer.setEndpointUri("jetty:" + url); + + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(1); + + return streamer; + } + + /** + * @throws IOException + * @return HTTP response payloads. + */ + private List sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException { + List responses = Lists.newArrayList(); + + if (singleMessage) { + StringBuilder sb = new StringBuilder(); + + for (int i = fromIdx; i < fromIdx + cnt; i++) + sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n"); + + Request request = new Request.Builder() + .url(url) + .post(RequestBody.create(TEXT_PLAIN, sb.toString())) + .build(); + + Response response = httpClient.newCall(request).execute(); + + responses.add(response.body().string()); + } + else { + for (int i = fromIdx; i < fromIdx + cnt; i++) { + String payload = i + "," + TEST_DATA.get(i); + + Request request = new Request.Builder() + .url(url) + .post(RequestBody.create(TEXT_PLAIN, payload)) + .build(); + + Response response = httpClient.newCall(request).execute(); + + responses.add(response.body().string()); + } + } + + return responses; + } + + /** + * Returns a {@link StreamSingleTupleExtractor} for testing. + */ + private static StreamSingleTupleExtractor singleTupleExtractor() { + return new StreamSingleTupleExtractor() { + @Override public Map.Entry extract(Exchange exchange) { + List s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class)); + + return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); + } + }; + } + + /** + * Returns a {@link StreamMultipleTupleExtractor} for testing. + */ + private static StreamMultipleTupleExtractor multipleTupleExtractor() { + return new StreamMultipleTupleExtractor() { + @Override public Map extract(Exchange exchange) { + final Map map = Splitter.on("\n") + .omitEmptyStrings() + .withKeyValueSeparator(",") + .split(exchange.getIn().getBody(String.class)); + + final Map answer = new HashMap<>(); + + F.forEach(map.keySet(), new IgniteInClosure() { + @Override public void apply(String s) { + answer.put(Integer.parseInt(s), map.get(s)); + } + }); + + return answer; + } + }; + } + + /** + * Subscribe to cache put events. + */ + private CountDownLatch subscribeToPutEvents(int expect) { + Ignite ignite = grid(); + + // Listen to cache PUT events and expect as many as messages as test data items + final CountDownLatch latch = new CountDownLatch(expect); + @SuppressWarnings("serial") IgniteBiPredicate callback = + new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + return true; + } + }; + + remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null)) + .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + + return latch; + } + + /** + * Assert a given number of cache entries have been loaded. + */ + private void assertCacheEntriesLoaded(int cnt) { + // get the cache and check that the entries are present + IgniteCache cache = grid().cache(null); + + // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache + for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt)) + assertEquals(TEST_DATA.get(key), cache.get(key)); + + // assert that the cache exactly the specified amount of elements + assertEquals(cnt, cache.size(CachePeekMode.ALL)); + + // remove the event listener + grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java new file mode 100644 index 0000000..266c9cf --- /dev/null +++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.camel; + +import java.util.Set; + +import junit.framework.TestSuite; + +/** + * Camel streamer tests. + */ +public class IgniteCamelStreamerTestSuite extends TestSuite { + /** + * @return {@link IgniteCamelStreamerTest} test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + return suite(null); + } + + /** + * @param ignoredTests + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite(Set ignoredTests) throws Exception { + TestSuite suite = new TestSuite("IgniteCamelStreamer Test Suite"); + + suite.addTestSuite(IgniteCamelStreamerTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/resources/camel.test.properties ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties new file mode 100644 index 0000000..30459be --- /dev/null +++ b/modules/camel/src/test/resources/camel.test.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +test.contextPath = /ignite-properties http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java index 2cb7db7..afc1530 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java @@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer; * */ public abstract class StreamAdapter { - /** Tuple extractor extracting a single tuple from an event */ private StreamSingleTupleExtractor singleTupleExtractor; @@ -99,9 +98,9 @@ public abstract class StreamAdapter { */ @Deprecated public StreamTupleExtractor getTupleExtractor() { - if (singleTupleExtractor instanceof StreamTupleExtractor) { + if (singleTupleExtractor instanceof StreamTupleExtractor) return (StreamTupleExtractor) singleTupleExtractor; - } + throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " + "StreamTupleExtractor; use getSingleTupleExtractor instead"); } @@ -112,9 +111,9 @@ public abstract class StreamAdapter { */ @Deprecated public void setTupleExtractor(StreamTupleExtractor extractor) { - if (multipleTupleExtractor != null) { + if (multipleTupleExtractor != null) throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once."); - } + this.singleTupleExtractor = extractor; } @@ -129,9 +128,9 @@ public abstract class StreamAdapter { * @param singleTupleExtractor Extractor for key-value tuples from messages. */ public void setSingleTupleExtractor(StreamSingleTupleExtractor singleTupleExtractor) { - if (multipleTupleExtractor != null) { + if (multipleTupleExtractor != null) throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once."); - } + this.singleTupleExtractor = singleTupleExtractor; } @@ -146,9 +145,9 @@ public abstract class StreamAdapter { * @param multipleTupleExtractor Extractor for 1:n tuple extraction. */ public void setMultipleTupleExtractor(StreamMultipleTupleExtractor multipleTupleExtractor) { - if (singleTupleExtractor != null) { + if (singleTupleExtractor != null) throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once."); - } + this.multipleTupleExtractor = multipleTupleExtractor; } @@ -188,4 +187,4 @@ public abstract class StreamAdapter { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c40b551..b9c51b2 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ modules/jms11 modules/mqtt modules/zookeeper + modules/camel modules/platform