camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject camel git commit: CAMEL-10494 Camel-Kubernetes: Consuming events from nodes
Date Fri, 18 Nov 2016 12:14:55 GMT
Repository: camel
Updated Branches:
  refs/heads/master 8fbddbd7f -> 88de1d433


CAMEL-10494 Camel-Kubernetes: Consuming events from nodes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88de1d43
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88de1d43
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88de1d43

Branch: refs/heads/master
Commit: 88de1d433fe92c6a7b9dc410aff4d813c6102727
Parents: 8fbddbd
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Nov 18 11:41:50 2016 +0100
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Nov 18 11:41:50 2016 +0100

----------------------------------------------------------------------
 .../kubernetes/KubernetesEndpoint.java          |   4 +
 .../consumer/KubernetesNodesConsumer.java       | 118 ++++++++++++++++
 .../kubernetes/consumer/common/NodeEvent.java   |  47 +++++++
 .../consumer/KubernetesNodesConsumerTest.java   | 136 +++++++++++++++++++
 4 files changed, 305 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
index e520699..2934b64 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesNamespacesConsumer;
+import org.apache.camel.component.kubernetes.consumer.KubernetesNodesConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesPodsConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesReplicationControllersConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesServicesConsumer;
@@ -141,6 +142,9 @@ public class KubernetesEndpoint extends DefaultEndpoint {
                 
             case KubernetesCategory.NAMESPACES:
                 return new KubernetesNamespacesConsumer(this, processor);
+                
+            case KubernetesCategory.NODES:
+                return new KubernetesNodesConsumer(this, processor);
 
             default:
                 throw new IllegalArgumentException("The " + category + " consumer category
doesn't exist");

http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java
new file mode 100644
index 0000000..f2a1d55
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.concurrent.ExecutorService;
+
+import io.fabric8.kubernetes.api.model.DoneableNode;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeList;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientNonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.ClientResource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesEndpoint;
+import org.apache.camel.component.kubernetes.consumer.common.NodeEvent;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNodesConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesConsumer.class);
+
+    private final Processor processor;
+    private ExecutorService executor;
+
+    public KubernetesNodesConsumer(KubernetesEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.processor = processor;
+    }
+
+    @Override
+    public KubernetesEndpoint getEndpoint() {
+        return (KubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = getEndpoint().createExecutor();
+
+        executor.submit(new NodesConsumerTask());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        LOG.debug("Stopping Kubernetes Nodes Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != null)
{
+                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+
+    class NodesConsumerTask implements Runnable {
+        
+        @Override
+        public void run() {
+            ClientNonNamespaceOperation<Node, NodeList, DoneableNode, ClientResource<Node,
DoneableNode>> w = getEndpoint().getKubernetesClient().nodes();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey())

+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue()))
{
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName()))
{
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+            }
+            w.watch(new Watcher<Node>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    Node resource) {
+                    NodeEvent ne = new NodeEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(ne.getNode());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION,
ne.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP,
System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing",
exchange, e);
+                    }
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }
+
+                }
+            });
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java
new file mode 100644
index 0000000..9824c43
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer.common;
+
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.client.Watcher.Action;
+
+public class NodeEvent {
+    private io.fabric8.kubernetes.client.Watcher.Action action;
+
+    private Node node;
+
+    public NodeEvent(Action action, Node node) {
+        this.action = action;
+        this.node = node;
+    }
+
+    public io.fabric8.kubernetes.client.Watcher.Action getAction() {
+        return action;
+    }
+
+    public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) {
+        this.action = action;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    public void setNode(Node node) {
+        this.node = node;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java
new file mode 100644
index 0000000..0d416aa
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodSpec;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.junit.Test;
+
+public class KubernetesNodesConsumerTest extends KubernetesTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint mockResultEndpoint;
+
+    @Test
+    public void createAndDeletePod() throws Exception {
+        if (ObjectHelper.isEmpty(authToken)) {
+            return;
+        }
+
+        mockResultEndpoint.expectedMessageCount(1);
+        mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION,
"MODIFIED");
+        Exchange ex = template.request("direct:createPod", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME,
"default");
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test");
+                Map<String, String> labels = new HashMap<String, String>();
+                labels.put("this", "rocks");
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_PODS_LABELS, labels);
+                PodSpec podSpec = new PodSpec();
+                podSpec.setHost("172.28.128.4");
+                Container cont = new Container();
+                cont.setImage("docker.io/jboss/wildfly:latest");
+                cont.setName("pippo");
+
+                List<ContainerPort> containerPort = new ArrayList<ContainerPort>();
+                ContainerPort port = new ContainerPort();
+                port.setHostIP("0.0.0.0");
+                port.setHostPort(8080);
+                port.setContainerPort(8080);
+
+                containerPort.add(port);
+
+                cont.setPorts(containerPort);
+
+                List<Container> list = new ArrayList<Container>();
+                list.add(cont);
+
+                podSpec.setContainers(list);
+
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_SPEC, podSpec);
+            }
+        });
+        
+        ex = template.request("direct:deletePod", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME,
"default");
+                exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test");
+            }
+        });
+
+        boolean podDeleted = ex.getOut().getBody(Boolean.class);
+
+        assertTrue(podDeleted);
+
+        Thread.sleep(3000);
+
+        mockResultEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:list").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPods",
host,
+                        authToken);
+                from("direct:listByLabels")
+                        .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPodsByLabels",
host, authToken);
+                from("direct:getPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=getPod",
host,
+                        authToken);
+                from("direct:createPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=createPod",
host,
+                        authToken);
+                from("direct:deletePod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=deletePod",
host,
+                        authToken);
+                fromF("kubernetes://%s?oauthToken=%s&category=nodes&resourceName=minikube",
host, authToken)
+                        .process(new KubernertesProcessor()).to(mockResultEndpoint);
+            }
+        };
+    }
+
+    public class KubernertesProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            Message in = exchange.getIn();
+            Node node = exchange.getIn().getBody(Node.class);
+            log.info("Got event with node name: " + node.getMetadata().getName() + " and
action "
+                    + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION));
+        }
+    }
+}


Mime
View raw message