camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject [11/13] camel git commit: CAMEL-10054: Create camel-atomix component
Date Fri, 16 Jun 2017 15:39:12 GMT
CAMEL-10054: Create camel-atomix component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d7fbb54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d7fbb54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d7fbb54

Branch: refs/heads/master
Commit: 4d7fbb545710a1b9918e92c1f7b3c4dae5f1d72e
Parents: 2573fe6
Author: lburgazzoli <lburgazzoli@gmail.com>
Authored: Wed Jun 14 10:33:30 2017 +0200
Committer: lburgazzoli <lburgazzoli@gmail.com>
Committed: Fri Jun 16 17:37:55 2017 +0200

----------------------------------------------------------------------
 .../component/atomix/AtomixConfiguration.java   |   1 +
 .../atomix/client/AtomixClientComponent.java    |   1 +
 .../client/AtomixClientConfiguration.java       |   1 +
 .../atomix/cluster/AtomixClusterComponent.java  |   1 +
 .../cluster/AtomixClusterConfiguration.java     |   1 +
 .../atomix/cluster/AtomixClusterHelper.java     |   1 +
 .../component/atomix/ha/AtomixCluster.java      | 191 -------------------
 .../atomix/ha/AtomixClusterService.java         | 180 +++++++++++++++++
 .../component/atomix/ha/AtomixClusterView.java  |  29 +--
 .../services/org/apache/camel/ha/atomix         |   2 +-
 .../atomix/ha/AtomixRoutePolicyMain.java        |  75 --------
 .../atomix/ha/AtomixRoutePolicyTest.java        | 116 +++++++++++
 .../src/test/resources/log4j2.properties        |   2 +-
 13 files changed, 322 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
index 4504488..d0e3203 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
index c4348c4..775315b 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix.client;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
index 628f88b..d3134c4 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix.client;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
index fa1cace..8db6487 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix.cluster;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
index 6a61eb9..8a732e1 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix.cluster;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
index 4e03627..777a4f6 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
@@ -12,6 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.camel.component.atomix.cluster;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
deleted file mode 100644
index ac8dbc7..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import java.util.List;
-
-import io.atomix.Atomix;
-import io.atomix.AtomixReplica;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.storage.StorageLevel;
-import org.apache.camel.CamelContext;
-import org.apache.camel.component.atomix.cluster.AtomixClusterConfiguration;
-import org.apache.camel.component.atomix.cluster.AtomixClusterHelper;
-import org.apache.camel.impl.ha.AbstractCamelCluster;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class AtomixCluster extends AbstractCamelCluster<AtomixClusterView> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixCluster.class);
-
-    private CamelContext camelContext;
-    private Address address;
-    private AtomixClusterConfiguration configuration;
-    private AtomixReplica atomix;
-
-    public AtomixCluster() {
-        super("atomix");
-
-        this.configuration = new AtomixClusterConfiguration();
-    }
-
-    public AtomixCluster(CamelContext camelContext, Address address, AtomixClusterConfiguration
configuration) {
-        super("atomix");
-
-        this.camelContext = camelContext;
-        this.address = address;
-        this.configuration = configuration.copy();
-    }
-
-    // **********************************
-    // Properties
-    // **********************************
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    public Address getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = new Address(address);
-    }
-
-    public void setAddress(Address address) {
-        this.address = address;
-    }
-
-    public AtomixClusterConfiguration getConfiguration() {
-        return configuration;
-    }
-
-    public void setConfiguration(AtomixClusterConfiguration configuration) {
-        this.configuration = configuration.copy();
-    }
-
-    public String getStoragePath() {
-        return configuration.getStoragePath();
-    }
-
-    public void setStoragePath(String storagePath) {
-        configuration.setStoragePath(storagePath);
-    }
-
-    public List<Address> getNodes() {
-        return configuration.getNodes();
-    }
-
-    public StorageLevel getStorageLevel() {
-        return configuration.getStorageLevel();
-    }
-
-    public void setNodes(List<Address> nodes) {
-        configuration.setNodes(nodes);
-    }
-
-    public void setStorageLevel(StorageLevel storageLevel) {
-        configuration.setStorageLevel(storageLevel);
-    }
-
-    public void setNodes(String nodes) {
-        configuration.setNodes(nodes);
-    }
-
-    public Class<? extends Transport> getTransport() {
-        return configuration.getTransport();
-    }
-
-    public void setTransport(Class<? extends Transport> transport) {
-        configuration.setTransport(transport);
-    }
-
-    public String getReplicaRef() {
-        return configuration.getReplicaRef();
-    }
-
-    public void setReplicaRef(String clusterref) {
-        configuration.setReplicaRef(clusterref);
-    }
-
-    public Atomix getReplica() {
-        return configuration.getReplica();
-    }
-
-    public void setReplica(AtomixReplica replica) {
-        configuration.setReplica(replica);
-    }
-
-    public String getConfigurationUri() {
-        return configuration.getConfigurationUri();
-    }
-
-    public void setConfigurationUri(String configurationUri) {
-        configuration.setConfigurationUri(configurationUri);
-    }
-
-    // *********************************************
-    // Lifecycle
-    // *********************************************
-
-    @Override
-    protected void doStart() throws Exception {
-        // Assume that if addresses are provided the cluster needs be bootstrapped.
-        if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
-            LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
-            getOrCreateAtomix().bootstrap(configuration.getNodes()).join();
-            LOGGER.debug("Bootstrap cluster done");
-        }
-
-        super.doStart();
-    }
-
-    @Override
-    protected AtomixClusterView createView(String namespace) throws Exception {
-        return new AtomixClusterView(this, namespace, getOrCreateAtomix());
-    }
-
-
-    private AtomixReplica getOrCreateAtomix() throws Exception {
-        if (atomix == null) {
-            // Validate parameters
-            ObjectHelper.notNull(camelContext, "Camel Context");
-            ObjectHelper.notNull(address, "Atomix Node Address");
-            ObjectHelper.notNull(configuration, "Atomix Node Configuration");
-
-            atomix = AtomixClusterHelper.createReplica(camelContext, address, configuration);
-
-            // Assume that if addresses are provided the cluster needs be bootstrapped.
-            if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
-                LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
-                this.atomix.bootstrap(configuration.getNodes()).join();
-                LOGGER.debug("Bootstrap cluster done");
-            }
-        }
-
-        return this.atomix;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
new file mode 100644
index 0000000..6a6a750
--- /dev/null
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.List;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Transport;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.atomix.cluster.AtomixClusterConfiguration;
+import org.apache.camel.component.atomix.cluster.AtomixClusterHelper;
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView>
{
+    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterService.class);
+
+    private Address address;
+    private AtomixClusterConfiguration configuration;
+    private AtomixReplica atomix;
+
+    public AtomixClusterService() {
+        this.configuration = new AtomixClusterConfiguration();
+    }
+
+    public AtomixClusterService(CamelContext camelContext, Address address, AtomixClusterConfiguration
configuration) {
+        super(null, camelContext);
+
+        this.address = address;
+        this.configuration = configuration.copy();
+    }
+
+    // **********************************
+    // Properties
+    // **********************************
+
+    public Address getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = new Address(address);
+    }
+
+    public void setAddress(Address address) {
+        this.address = address;
+    }
+
+    public AtomixClusterConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(AtomixClusterConfiguration configuration) {
+        this.configuration = configuration.copy();
+    }
+
+    public String getStoragePath() {
+        return configuration.getStoragePath();
+    }
+
+    public void setStoragePath(String storagePath) {
+        configuration.setStoragePath(storagePath);
+    }
+
+    public List<Address> getNodes() {
+        return configuration.getNodes();
+    }
+
+    public StorageLevel getStorageLevel() {
+        return configuration.getStorageLevel();
+    }
+
+    public void setNodes(List<Address> nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public void setStorageLevel(StorageLevel storageLevel) {
+        configuration.setStorageLevel(storageLevel);
+    }
+
+    public void setNodes(String nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public Class<? extends Transport> getTransport() {
+        return configuration.getTransport();
+    }
+
+    public void setTransport(Class<? extends Transport> transport) {
+        configuration.setTransport(transport);
+    }
+
+    public String getReplicaRef() {
+        return configuration.getReplicaRef();
+    }
+
+    public void setReplicaRef(String clusterref) {
+        configuration.setReplicaRef(clusterref);
+    }
+
+    public Atomix getReplica() {
+        return configuration.getReplica();
+    }
+
+    public void setReplica(AtomixReplica replica) {
+        configuration.setReplica(replica);
+    }
+
+    public String getConfigurationUri() {
+        return configuration.getConfigurationUri();
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    // *********************************************
+    // Lifecycle
+    // *********************************************
+
+    @Override
+    protected void doStart() throws Exception {
+        // Assume that if addresses are provided the cluster needs be bootstrapped.
+        if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
+            LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
+            getOrCreateAtomix().bootstrap(configuration.getNodes()).join();
+            LOGGER.debug("Bootstrap cluster done");
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    protected AtomixClusterView createView(String namespace) throws Exception {
+        return new AtomixClusterView(this, namespace, getOrCreateAtomix());
+    }
+
+
+    private AtomixReplica getOrCreateAtomix() throws Exception {
+        if (atomix == null) {
+            // Validate parameters
+            ObjectHelper.notNull(getCamelContext(), "Camel Context");
+            ObjectHelper.notNull(address, "Atomix Node Address");
+            ObjectHelper.notNull(configuration, "Atomix Node Configuration");
+
+            atomix = AtomixClusterHelper.createReplica(getCamelContext(), address, configuration);
+
+            if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
+                LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
+                this.atomix.bootstrap(configuration.getNodes()).join();
+                LOGGER.debug("Bootstrap cluster done");
+            } else {
+                LOGGER.debug("Bootstrap cluster on address {}", address, configuration.getNodes());
+                this.atomix.bootstrap().join();
+                LOGGER.debug("Bootstrap cluster done");
+            }
+        }
+
+        return this.atomix;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index bf1c4d1..5078508 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -24,10 +24,9 @@ import io.atomix.Atomix;
 import io.atomix.group.DistributedGroup;
 import io.atomix.group.GroupMember;
 import io.atomix.group.LocalMember;
-import io.atomix.group.election.Term;
 import org.apache.camel.ha.CamelClusterMember;
-import org.apache.camel.ha.CamelClusterView;
 import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +37,7 @@ public final class AtomixClusterView extends AbstractCamelClusterView {
     private final AtomixLocalMember localMember;
     private DistributedGroup group;
 
-    AtomixClusterView(AtomixCluster cluster, String namespace, Atomix atomix) {
+    AtomixClusterView(AtomixClusterService cluster, String namespace, Atomix atomix) {
         super(cluster, namespace);
 
         this.atomix = atomix;
@@ -84,7 +83,13 @@ public final class AtomixClusterView extends AbstractCamelClusterView {
             localMember.join();
 
             LOGGER.debug("Listen election events");
-            group.election().onElection(this::onElection);
+            group.election().onElection(term -> fireLeadershipChangedEvent(asCamelClusterMember(term.leader())));
+
+            LOGGER.debug("Listen join events");
+            group.onJoin(member -> fireMemberAddedEvent(asCamelClusterMember(member)));
+
+            LOGGER.debug("Listen leave events");
+            group.onLeave(member -> fireMemberRemovedEvent(asCamelClusterMember(member)));
         }
     }
 
@@ -93,10 +98,6 @@ public final class AtomixClusterView extends AbstractCamelClusterView {
         localMember.leave();
     }
 
-    private void onElection(Term term) {
-        fireEvent(CamelClusterView.Event.LEADERSHIP_CHANGED, asCamelClusterMember(term.leader()));
-    }
-
     // ***********************************************
     //
     // ***********************************************
@@ -122,7 +123,7 @@ public final class AtomixClusterView extends AbstractCamelClusterView
{
                 return false;
             }
 
-            return group.election().term().leader().equals(member);
+            return member.equals(group.election().term().leader());
         }
 
         boolean hasJoined() {
@@ -131,8 +132,14 @@ public final class AtomixClusterView extends AbstractCamelClusterView
{
 
         AtomixLocalMember join() throws ExecutionException, InterruptedException {
             if (member == null && group != null) {
-                LOGGER.debug("Joining group {}", group);
-                member = group.join().join();
+                String id = getClusterService().getId();
+                if (ObjectHelper.isNotEmpty(id)) {
+                    LOGGER.debug("Joining group: {}, with id: {}", group, id);
+                    member = group.join(id).join();
+                } else {
+                    LOGGER.debug("Joining group: {} ", group);
+                    member = group.join().join();
+                }
             }
 
             return this;

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/ha/atomix
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/ha/atomix
b/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/ha/atomix
index 2330253..a80d8bf 100644
--- a/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/ha/atomix
+++ b/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/ha/atomix
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.component.atomix.ha.AtomixClusterFactory
+class=org.apache.camel.component.atomix.ha.AtomixClusterService

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
deleted file mode 100644
index e0e253f..0000000
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import io.atomix.catalyst.transport.Address;
-import io.atomix.copycat.server.storage.StorageLevel;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class AtomixRoutePolicyMain {
-    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyMain.class);
-
-    public static void main(final String[] args) throws Exception {
-        final Integer index = Integer.getInteger("atomix.index");
-        final String[] addresses = System.getProperty("atomix.cluster").split(",");
-
-        List<Address> nodes = new ArrayList<>();
-        for (int i = 0; i < addresses.length; i++) {
-            String[] parts = addresses[i].split(":");
-            nodes.add(new Address(parts[0], Integer.valueOf(parts[1])));
-        }
-
-        AtomixCluster cluster = new AtomixCluster();
-        cluster.setStorageLevel(StorageLevel.MEMORY);
-        cluster.setAddress(nodes.get(index));
-        cluster.setNodes(nodes);
-
-        DefaultCamelContext context = new DefaultCamelContext();
-        context.addService(cluster);
-        context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                fromF("timer:atomix-%d-1?period=2s", nodes.get(index).port())
-                    .log("${routeId} (1)");
-                fromF("timer:atomix-%d-2?period=5s", nodes.get(index).port())
-                    .log("${routeId} (2)");
-            }
-        });
-
-        context.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            try {
-                context.stop();
-            } catch (Exception e) {
-                LOGGER.warn("", e);
-            }
-        }));
-
-        for (int i = 0; i < Integer.MAX_VALUE; i++) {
-            Thread.sleep(1000);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
new file mode 100644
index 0000000..c70372a
--- /dev/null
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AtomixRoutePolicyTest {
+    private static final List<Address> ADDRESSES = Arrays.asList(
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable())
+    );
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyTest.class);
+    private static final Set<Address> RESULTS = new HashSet<>();
+    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(ADDRESSES.size()
* 2);
+    private static final CountDownLatch LATCH = new CountDownLatch(ADDRESSES.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        for (Address address: ADDRESSES) {
+            SCHEDULER.submit(() -> run(address));
+        }
+
+        LATCH.await(1, TimeUnit.MINUTES);
+        SCHEDULER.shutdownNow();
+
+        Assert.assertEquals(ADDRESSES.size(), RESULTS.size());
+        Assert.assertTrue(RESULTS.containsAll(ADDRESSES));
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(Address address) {
+        try {
+            CountDownLatch contextLatch = new CountDownLatch(1);
+
+            AtomixClusterService service = new AtomixClusterService();
+            service.setId("node-" + address.port());
+            service.setStorageLevel(StorageLevel.MEMORY);
+            service.setAddress(address);
+            service.setNodes(ADDRESSES);
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + address.port());
+            context.addService(service);
+            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:atomix?delay=1s&period=1s&repeatCount=1")
+                        .routeId("route-" + address.port())
+                        .process(e -> {
+                            LOGGER.debug("Node {} done", address);
+                            RESULTS.add(address);
+                            // Shutdown the context later on to give a chance to
+                            // other members to catch-up
+                            SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3),
TimeUnit.SECONDS);
+                        });
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d7fbb54/components/camel-atomix/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/resources/log4j2.properties b/components/camel-atomix/src/test/resources/log4j2.properties
index 1e650b1..d70cd9d 100644
--- a/components/camel-atomix/src/test/resources/log4j2.properties
+++ b/components/camel-atomix/src/test/resources/log4j2.properties
@@ -38,4 +38,4 @@ logger.camel-impl-ha.name = org.apache.camel.impl.ha
 logger.camel-impl-ha.level = DEBUG
 
 rootLogger.level = INFO
-rootLogger.appenderRef.file.ref = out
+rootLogger.appenderRef.file.ref = file


Mime
View raw message