camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [1/2] camel git commit: CAMEL-11475 - Camel-Caffeine: Add idempotent repository to component
Date Fri, 30 Jun 2017 08:53:30 GMT
Repository: camel
Updated Branches:
  refs/heads/master 52cede016 -> bac9ddd4b


CAMEL-11475 - Camel-Caffeine: Add idempotent repository to component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/222f287b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/222f287b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/222f287b

Branch: refs/heads/master
Commit: 222f287b9d8deb01c65568a06847874aba3d6879
Parents: 52cede0
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Jun 30 10:37:32 2017 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Jun 30 10:52:33 2017 +0200

----------------------------------------------------------------------
 .../CaffeineIdempotentRepository.java           |  97 +++++++++++++
 .../CaffeineIdempotentRepositoryTest.java       | 138 +++++++++++++++++++
 2 files changed, 235 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/222f287b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java
new file mode 100644
index 0000000..b1f9880
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.idempotent;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+@ManagedResource(description = "Caffeine based message id repository")
+public class CaffeineIdempotentRepository extends ServiceSupport implements IdempotentRepository<String>
{
+
+    private String cacheName;
+    private Cache<String, Boolean> cache;
+
+    public CaffeineIdempotentRepository() {
+        this(CaffeineIdempotentRepository.class.getSimpleName());
+    }
+
+    public CaffeineIdempotentRepository(String repositoryName) {
+        this.cacheName = repositoryName;
+    }
+
+    @ManagedAttribute(description = "The processor name")
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    @Override
+    @ManagedOperation(description = "Adds the key to the store")
+    public boolean add(String key) {
+        if (cache.asMap().containsKey(key)) {
+            return false;
+        } else {
+            cache.put(key, true);
+            return true;
+        }
+    }
+
+    @Override
+    public boolean confirm(String key) {
+        return cache.asMap().containsKey(key);
+    }
+
+    @Override
+    @ManagedOperation(description = "Does the store contain the given key")
+    public boolean contains(String key) {
+        return cache.asMap().containsKey(key);
+    }
+
+    @Override
+    @ManagedOperation(description = "Remove the key from the store")
+    public boolean remove(String key) {
+        cache.invalidate(key);
+        return true;
+    }
+
+    @Override
+    @ManagedOperation(description = "Clear the store")
+    public void clear() {
+        cache.invalidateAll();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (cache == null) {
+            Caffeine<Object, Object> builder = Caffeine.newBuilder();
+            cache = builder.build();
+        }
+    }
+    
+    protected Cache getCache() {
+        return this.cache;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/222f287b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java
b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java
new file mode 100644
index 0000000..e2eef89
--- /dev/null
+++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.idempotent;
+
+import java.util.UUID;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import com.github.benmanes.caffeine.cache.Cache;
+
+public class CaffeineIdempotentRepositoryTest extends CamelTestSupport {
+
+    private CaffeineIdempotentRepository repo;
+    private Cache<String, Boolean> cache;
+    private String key01;
+    private String key02;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        super.doPreSetup();
+
+        repo = new CaffeineIdempotentRepository("test");
+
+        key01 = generateRandomString();
+        key02 = generateRandomString();
+    }
+
+    @Test
+    public void testAdd() throws Exception {
+        // add first key
+        assertTrue(repo.add(key01));
+        assertTrue(repo.getCache().asMap().containsKey(key01));
+
+        // try to add the same key again
+        assertFalse(repo.add(key01));
+
+        // try to add an other one
+        assertTrue(repo.add(key02));
+        assertTrue(repo.getCache().asMap().containsKey(key02));
+    }
+
+    @Test
+    public void testConfirm() throws Exception {
+        // add first key and confirm
+        assertTrue(repo.add(key01));
+        assertTrue(repo.confirm(key01));
+
+        // try to confirm a key that isn't there
+        assertFalse(repo.confirm(key02));
+    }
+
+    @Test
+    public void testContains() throws Exception {
+        assertFalse(repo.contains(key01));
+
+        // add key and check again
+        assertTrue(repo.add(key01));
+        assertTrue(repo.contains(key01));
+
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        // add key to remove
+        assertTrue(repo.add(key01));
+        assertTrue(repo.add(key02));
+        assertTrue(repo.getCache().asMap().containsKey(key01));
+        assertTrue(repo.getCache().asMap().containsKey(key02));
+
+        // clear repo
+        repo.clear();
+        assertFalse(repo.getCache().asMap().containsKey(key01));
+        assertFalse(repo.getCache().asMap().containsKey(key02));
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        // add key to remove
+        assertTrue(repo.add(key01));
+        assertTrue(repo.confirm(key01));
+
+        // remove key
+        assertTrue(repo.remove(key01));
+        assertFalse(repo.confirm(key01));
+
+        // try to remove a key that isn't there
+        repo.remove(key02);
+    }
+
+    @Test
+    public void testRepositoryInRoute() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:out");
+        mock.expectedBodiesReceived("a", "b");
+        // c is a duplicate
+
+        // should be started
+        assertEquals("Should be started", true, repo.getStatus().isStarted());
+
+        // send 3 message with one duplicated key (key01)
+        template.sendBodyAndHeader("direct://in", "a", "messageId", key01);
+        template.sendBodyAndHeader("direct://in", "b", "messageId", key02);
+        template.sendBodyAndHeader("direct://in", "c", "messageId", key01);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct://in")
+                    .idempotentConsumer(header("messageId"), repo)
+                    .to("mock://out");
+            }
+        };
+    }
+    
+    protected static String generateRandomString() {
+        return UUID.randomUUID().toString();
+    }
+}


Mime
View raw message