camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [07/10] camel git commit: CAMEL-11243 Split camel-kubernates component into 13 (build-configs, builds, configmaps, namespaces, nodes, persistent-volumes, persistent-volumes-claims, pods, replication-controllers, resources-quota, secrets, service-accounts
Date Mon, 29 May 2017 07:34:49 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesProducer.java
new file mode 100644
index 0000000..0a76126
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesProducer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.namespaces;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.DoneableNamespace;
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceBuilder;
+import io.fabric8.kubernetes.api.model.NamespaceList;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesOperations;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNamespacesProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KubernetesNamespacesProducer.class);
+
+    public KubernetesNamespacesProducer(AbstractKubernetesEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String operation;
+
+        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration()
+                .getOperation())) {
+            operation = exchange.getIn().getHeader(
+                    KubernetesConstants.KUBERNETES_OPERATION, String.class);
+        } else {
+            operation = getEndpoint().getKubernetesConfiguration()
+                    .getOperation();
+        }
+
+        switch (operation) {
+
+        case KubernetesOperations.LIST_NAMESPACE_OPERATION:
+            doList(exchange, operation);
+            break;
+
+        case KubernetesOperations.LIST_NAMESPACE_BY_LABELS_OPERATION:
+            doListNamespaceByLabel(exchange, operation);
+            break;
+
+        case KubernetesOperations.GET_NAMESPACE_OPERATION:
+            doGetNamespace(exchange, operation);
+            break;
+
+        case KubernetesOperations.CREATE_NAMESPACE_OPERATION:
+            doCreateNamespace(exchange, operation);
+            break;
+
+        case KubernetesOperations.DELETE_NAMESPACE_OPERATION:
+            doDeleteNamespace(exchange, operation);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unsupported operation "
+                    + operation);
+        }
+    }
+
+    protected void doList(Exchange exchange, String operation) throws Exception {
+        NamespaceList namespacesList = getEndpoint().getKubernetesClient()
+                .namespaces().list();
+        exchange.getOut().setBody(namespacesList.getItems());
+    }
+
+    protected void doListNamespaceByLabel(Exchange exchange, String operation) {
+        Map<String, String> labels = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_LABELS, Map.class);
+        if (ObjectHelper.isEmpty(labels)) {
+            LOG.error("Get a specific namespace by labels require specify a labels set");
+            throw new IllegalArgumentException(
+                    "Get a specific namespace by labels require specify a labels set");
+        }
+        NonNamespaceOperation<Namespace, NamespaceList, DoneableNamespace, Resource<Namespace, DoneableNamespace>> namespaces = getEndpoint().getKubernetesClient().namespaces();
+        for (Map.Entry<String, String> entry : labels.entrySet()) {
+            namespaces.withLabel(entry.getKey(), entry.getValue());
+        }
+        NamespaceList namespace = namespaces.list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(namespace.getItems());
+    }
+
+    protected void doGetNamespace(Exchange exchange, String operation) {
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Get a specific namespace require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Get a specific namespace require specify a namespace name");
+        }
+        Namespace namespace = getEndpoint().getKubernetesClient().namespaces()
+                .withName(namespaceName).get();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(namespace);
+    }
+
+    protected void doCreateNamespace(Exchange exchange, String operation) {
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Create a specific namespace require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Create a specific namespace require specify a namespace name");
+        }
+        Map<String, String> labels = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_LABELS, Map.class);
+        Namespace ns = new NamespaceBuilder().withNewMetadata()
+                .withName(namespaceName).withLabels(labels).endMetadata()
+                .build();
+        Namespace namespace = getEndpoint().getKubernetesClient().namespaces()
+                .create(ns);
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(namespace);
+    }
+
+    protected void doDeleteNamespace(Exchange exchange, String operation) {
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Delete a specific namespace require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Delete a specific namespace require specify a namespace name");
+        }
+        Boolean namespace = getEndpoint().getKubernetesClient().namespaces()
+                .withName(namespaceName).delete();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(namespace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesComponent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesComponent.java
new file mode 100644
index 0000000..81d9a12
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesComponent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.nodes;
+
+import org.apache.camel.component.kubernetes.AbstractKubernetesComponent;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+
+public class KubernetesNodesComponent extends AbstractKubernetesComponent {
+
+    protected KubernetesNodesEndpoint doCreateEndpoint(String uri, String remaining, KubernetesConfiguration config) throws Exception {
+        KubernetesNodesEndpoint endpoint = new KubernetesNodesEndpoint(uri, this, config);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
new file mode 100644
index 0000000..b269845
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.nodes;
+
+import java.util.concurrent.ExecutorService;
+
+import io.fabric8.kubernetes.api.model.DoneableNode;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeList;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesEndpoint;
+import org.apache.camel.component.kubernetes.consumer.common.NodeEvent;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNodesConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesConsumer.class);
+
+    private final Processor processor;
+    private ExecutorService executor;
+    private NodesConsumerTask nodesWatcher;
+
+    public KubernetesNodesConsumer(AbstractKubernetesEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.processor = processor;
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = getEndpoint().createExecutor();
+
+        nodesWatcher = new NodesConsumerTask();
+        executor.submit(nodesWatcher);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        LOG.debug("Stopping Kubernetes Nodes Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+                if (nodesWatcher != null) {
+                    nodesWatcher.getWatch().close();
+                }
+                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                if (nodesWatcher != null) {
+                    nodesWatcher.getWatch().close();
+                }
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+
+    class NodesConsumerTask implements Runnable {
+        
+        private Watch watch;
+        
+        @Override
+        public void run() {
+            NonNamespaceOperation<Node, NodeList, DoneableNode, Resource<Node, DoneableNode>> w = getEndpoint().getKubernetesClient().nodes();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) 
+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+            }
+            watch = w.watch(new Watcher<Node>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    Node resource) {
+                    NodeEvent ne = new NodeEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(ne.getNode());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }
+
+                }
+            });
+        }
+       
+        public Watch getWatch() {
+            return watch;
+        }
+
+        public void setWatch(Watch watch) {
+            this.watch = watch;
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesEndpoint.java
new file mode 100644
index 0000000..9302905
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesEndpoint.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.nodes;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.build_configs.KubernetesBuildConfigsProducer;
+import org.apache.camel.component.kubernetes.builds.KubernetesBuildsProducer;
+import org.apache.camel.component.kubernetes.config_maps.KubernetesConfigMapsProducer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesConsumer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesProducer;
+import org.apache.camel.component.kubernetes.persistent_volumes.KubernetesPersistentVolumesProducer;
+import org.apache.camel.component.kubernetes.persistent_volumes_claims.KubernetesPersistentVolumesClaimsProducer;
+import org.apache.camel.component.kubernetes.pods.KubernetesPodsConsumer;
+import org.apache.camel.component.kubernetes.pods.KubernetesPodsProducer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersConsumer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersProducer;
+import org.apache.camel.component.kubernetes.resources_quota.KubernetesResourcesQuotaProducer;
+import org.apache.camel.component.kubernetes.secrets.KubernetesSecretsProducer;
+import org.apache.camel.component.kubernetes.service_accounts.KubernetesServiceAccountsProducer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesConsumer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesProducer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Kubernetes Nodes component provides a producer to execute kubernetes node operations
+ * and a consumer to consume node events.
+ */
+@UriEndpoint(firstVersion = "2.17.0", scheme = "kubernetes-nodes", title = "Kubernetes Nodes",
+    syntax = "kubernetes-nodes:masterUrl", consumerClass = KubernetesNodesConsumer.class, label = "container,cloud,paas")
+public class KubernetesNodesEndpoint extends AbstractKubernetesEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesEndpoint.class);
+
+    public KubernetesNodesEndpoint(String uri, KubernetesNodesComponent component, KubernetesConfiguration config) {
+        super(uri, component, config);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KubernetesNodesProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new KubernetesNodesConsumer(this, processor);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesProducer.java
new file mode 100644
index 0000000..f99b27d
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesProducer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.nodes;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.DoneableNode;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeList;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesOperations;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNodesProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesProducer.class);
+
+    public KubernetesNodesProducer(AbstractKubernetesEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String operation;
+
+        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration().getOperation())) {
+            operation = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_OPERATION, String.class);
+        } else {
+            operation = getEndpoint().getKubernetesConfiguration().getOperation();
+        }
+
+        switch (operation) {
+
+        case KubernetesOperations.LIST_NODES:
+            doList(exchange, operation);
+            break;
+
+        case KubernetesOperations.LIST_NODES_BY_LABELS_OPERATION:
+            doListNodesByLabels(exchange, operation);
+            break;
+
+        case KubernetesOperations.GET_NODE_OPERATION:
+            doGetNode(exchange, operation);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unsupported operation " + operation);
+        }
+    }
+
+    protected void doList(Exchange exchange, String operation) throws Exception {
+        NodeList nodeList = getEndpoint().getKubernetesClient().nodes().list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(nodeList.getItems());
+    }
+
+    protected void doListNodesByLabels(Exchange exchange, String operation) throws Exception {
+        NodeList nodeList = null;
+        Map<String, String> labels = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NODES_LABELS, Map.class);
+        NonNamespaceOperation<Node, NodeList, DoneableNode, Resource<Node, DoneableNode>> nodes = getEndpoint().getKubernetesClient().nodes();
+        for (Map.Entry<String, String> entry : labels.entrySet()) {
+            nodes.withLabel(entry.getKey(), entry.getValue());
+        }
+        nodeList = nodes.list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(nodeList.getItems());
+    }
+
+    protected void doGetNode(Exchange exchange, String operation) throws Exception {
+        Node node = null;
+        String pvName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NODE_NAME, String.class);
+        if (ObjectHelper.isEmpty(pvName)) {
+            LOG.error("Get a specific Node require specify a Node name");
+            throw new IllegalArgumentException("Get a specific Node require specify a Node name");
+        }
+        node = getEndpoint().getKubernetesClient().nodes().withName(pvName).get();
+
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(node);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesComponent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesComponent.java
new file mode 100644
index 0000000..ad3398e
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesComponent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes;
+
+import org.apache.camel.component.kubernetes.AbstractKubernetesComponent;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+
+public class KubernetesPersistentVolumesComponent extends AbstractKubernetesComponent {
+
+    protected KubernetesPersistentVolumesEndpoint doCreateEndpoint(String uri, String remaining, KubernetesConfiguration config) throws Exception {
+        KubernetesPersistentVolumesEndpoint endpoint = new KubernetesPersistentVolumesEndpoint(uri, this, config);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesEndpoint.java
new file mode 100644
index 0000000..316796f
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesEndpoint.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Kubernetes Persistent Volumes component provides a producer to execute kubernetes persistent volume operations.
+ */
+@UriEndpoint(firstVersion = "2.17.0", scheme = "kubernetes-persistent-volumes", title = "Kubernetes Persistent Volume",
+    syntax = "kubernetes-persistent-volumes:masterUrl", producerOnly = true, label = "container,cloud,paas")
+public class KubernetesPersistentVolumesEndpoint extends AbstractKubernetesEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesPersistentVolumesEndpoint.class);
+
+    public KubernetesPersistentVolumesEndpoint(String uri, KubernetesPersistentVolumesComponent component, KubernetesConfiguration config) {
+        super(uri, component, config);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KubernetesPersistentVolumesProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new IllegalArgumentException("The kubernetes-persistent-volumes doesn't support consumer");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesProducer.java
new file mode 100644
index 0000000..4169acf
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes/KubernetesPersistentVolumesProducer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.DoneablePersistentVolume;
+import io.fabric8.kubernetes.api.model.PersistentVolume;
+import io.fabric8.kubernetes.api.model.PersistentVolumeList;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesOperations;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesPersistentVolumesProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KubernetesPersistentVolumesProducer.class);
+
+    public KubernetesPersistentVolumesProducer(AbstractKubernetesEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String operation;
+
+        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration()
+                .getOperation())) {
+            operation = exchange.getIn().getHeader(
+                    KubernetesConstants.KUBERNETES_OPERATION, String.class);
+        } else {
+            operation = getEndpoint().getKubernetesConfiguration()
+                    .getOperation();
+        }
+
+        switch (operation) {
+
+        case KubernetesOperations.LIST_PERSISTENT_VOLUMES:
+            doList(exchange, operation);
+            break;
+
+        case KubernetesOperations.LIST_PERSISTENT_VOLUMES_BY_LABELS_OPERATION:
+            doListPersistentVolumesByLabels(exchange, operation);
+            break;
+
+        case KubernetesOperations.GET_PERSISTENT_VOLUME_OPERATION:
+            doGetPersistentVolume(exchange, operation);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unsupported operation "
+                    + operation);
+        }
+    }
+
+    protected void doList(Exchange exchange, String operation) throws Exception {
+        PersistentVolumeList persistentVolumeList = getEndpoint()
+                .getKubernetesClient().persistentVolumes().list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(persistentVolumeList.getItems());
+    }
+
+    protected void doListPersistentVolumesByLabels(Exchange exchange,
+            String operation) throws Exception {
+        PersistentVolumeList pvList = null;
+        Map<String, String> labels = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUMES_LABELS,
+                Map.class);
+        NonNamespaceOperation<PersistentVolume, PersistentVolumeList, DoneablePersistentVolume, Resource<PersistentVolume, DoneablePersistentVolume>> pvs; 
+        pvs = getEndpoint().getKubernetesClient().persistentVolumes();
+        for (Map.Entry<String, String> entry : labels.entrySet()) {
+            pvs.withLabel(entry.getKey(), entry.getValue());
+        }
+        pvList = pvs.list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pvList.getItems());
+    }
+
+    protected void doGetPersistentVolume(Exchange exchange, String operation)
+            throws Exception {
+        PersistentVolume pv = null;
+        String pvName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUME_NAME,
+                String.class);
+        if (ObjectHelper.isEmpty(pvName)) {
+            LOG.error("Get a specific Persistent Volume require specify a Persistent Volume name");
+            throw new IllegalArgumentException(
+                    "Get a specific Persistent Volume require specify a Persistent Volume name");
+        }
+        pv = getEndpoint().getKubernetesClient().persistentVolumes().withName(pvName).get();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pv);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsComponent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsComponent.java
new file mode 100644
index 0000000..d1194a3
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsComponent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes_claims;
+
+import org.apache.camel.component.kubernetes.AbstractKubernetesComponent;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+
+public class KubernetesPersistentVolumesClaimsComponent extends AbstractKubernetesComponent {
+
+    protected KubernetesPersistentVolumesClaimsEndpoint doCreateEndpoint(String uri, String remaining, KubernetesConfiguration config) throws Exception {
+        KubernetesPersistentVolumesClaimsEndpoint endpoint = new KubernetesPersistentVolumesClaimsEndpoint(uri, this, config);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsEndpoint.java
new file mode 100644
index 0000000..a6e94ea
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsEndpoint.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes_claims;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.build_configs.KubernetesBuildConfigsProducer;
+import org.apache.camel.component.kubernetes.builds.KubernetesBuildsProducer;
+import org.apache.camel.component.kubernetes.config_maps.KubernetesConfigMapsProducer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesConsumer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesProducer;
+import org.apache.camel.component.kubernetes.nodes.KubernetesNodesConsumer;
+import org.apache.camel.component.kubernetes.nodes.KubernetesNodesProducer;
+import org.apache.camel.component.kubernetes.persistent_volumes.KubernetesPersistentVolumesProducer;
+import org.apache.camel.component.kubernetes.pods.KubernetesPodsConsumer;
+import org.apache.camel.component.kubernetes.pods.KubernetesPodsProducer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersConsumer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersProducer;
+import org.apache.camel.component.kubernetes.resources_quota.KubernetesResourcesQuotaProducer;
+import org.apache.camel.component.kubernetes.secrets.KubernetesSecretsProducer;
+import org.apache.camel.component.kubernetes.service_accounts.KubernetesServiceAccountsProducer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesConsumer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesProducer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Kubernetes Persistent Volumes Claims component provides a producer to execute kubernetes persistent volume claim operations.
+ */
+@UriEndpoint(firstVersion = "2.17.0", scheme = "kubernetes-persistent-volumes-claims", title = "Kubernetes Persistent Volume Claim",
+    syntax = "kubernetes-persistent-volumes-claims:masterUrl", producerOnly = true, label = "container,cloud,paas")
+public class KubernetesPersistentVolumesClaimsEndpoint extends AbstractKubernetesEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesPersistentVolumesClaimsEndpoint.class);
+
+    public KubernetesPersistentVolumesClaimsEndpoint(String uri, KubernetesPersistentVolumesClaimsComponent component, KubernetesConfiguration config) {
+        super(uri, component, config);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KubernetesPersistentVolumesClaimsProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new IllegalArgumentException("The kubernetes-persistent-volumes-claims doesn't exist");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsProducer.java
new file mode 100644
index 0000000..98d18bb
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/persistent_volumes_claims/KubernetesPersistentVolumesClaimsProducer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.persistent_volumes_claims;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.DoneablePersistentVolumeClaim;
+import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
+import io.fabric8.kubernetes.api.model.PersistentVolumeClaimBuilder;
+import io.fabric8.kubernetes.api.model.PersistentVolumeClaimList;
+import io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesOperations;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesPersistentVolumesClaimsProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KubernetesPersistentVolumesClaimsProducer.class);
+
+    public KubernetesPersistentVolumesClaimsProducer(AbstractKubernetesEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String operation;
+
+        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration()
+                .getOperation())) {
+            operation = exchange.getIn().getHeader(
+                    KubernetesConstants.KUBERNETES_OPERATION, String.class);
+        } else {
+            operation = getEndpoint().getKubernetesConfiguration()
+                    .getOperation();
+        }
+
+        switch (operation) {
+
+        case KubernetesOperations.LIST_PERSISTENT_VOLUMES_CLAIMS:
+            doList(exchange, operation);
+            break;
+
+        case KubernetesOperations.LIST_PERSISTENT_VOLUMES_CLAIMS_BY_LABELS_OPERATION:
+            doListPersistentVolumesClaimsByLabels(exchange, operation);
+            break;
+
+        case KubernetesOperations.GET_PERSISTENT_VOLUME_CLAIM_OPERATION:
+            doGetPersistentVolumeClaim(exchange, operation);
+            break;
+
+        case KubernetesOperations.CREATE_PERSISTENT_VOLUME_CLAIM_OPERATION:
+            doCreatePersistentVolumeClaim(exchange, operation);
+            break;
+
+        case KubernetesOperations.DELETE_PERSISTENT_VOLUME_CLAIM_OPERATION:
+            doDeletePersistentVolumeClaim(exchange, operation);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unsupported operation "
+                    + operation);
+        }
+    }
+
+    protected void doList(Exchange exchange, String operation) throws Exception {
+        PersistentVolumeClaimList persistentVolumeClaimList = getEndpoint()
+                .getKubernetesClient().persistentVolumeClaims().list();
+        exchange.getOut().setBody(persistentVolumeClaimList.getItems());
+    }
+
+    protected void doListPersistentVolumesClaimsByLabels(Exchange exchange,
+            String operation) throws Exception {
+        PersistentVolumeClaimList pvcList = null;
+        Map<String, String> labels = exchange
+                .getIn()
+                .getHeader(
+                        KubernetesConstants.KUBERNETES_PERSISTENT_VOLUMES_CLAIMS_LABELS,
+                        Map.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (!ObjectHelper.isEmpty(namespaceName)) {
+            NonNamespaceOperation<PersistentVolumeClaim, PersistentVolumeClaimList, DoneablePersistentVolumeClaim, 
+            Resource<PersistentVolumeClaim, DoneablePersistentVolumeClaim>> pvcs = getEndpoint().getKubernetesClient().persistentVolumeClaims()
+                    .inNamespace(namespaceName);
+            for (Map.Entry<String, String> entry : labels.entrySet()) {
+                pvcs.withLabel(entry.getKey(), entry.getValue());
+            }
+            pvcList = pvcs.list();
+        } else {
+            MixedOperation<PersistentVolumeClaim, PersistentVolumeClaimList, DoneablePersistentVolumeClaim, 
+            Resource<PersistentVolumeClaim, DoneablePersistentVolumeClaim>> pvcs = getEndpoint().getKubernetesClient().persistentVolumeClaims();
+            for (Map.Entry<String, String> entry : labels.entrySet()) {
+                pvcs.withLabel(entry.getKey(), entry.getValue());
+            }
+            pvcList = pvcs.list();
+        }
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pvcList.getItems());
+    }
+
+    protected void doGetPersistentVolumeClaim(Exchange exchange,
+            String operation) throws Exception {
+        PersistentVolumeClaim pvc = null;
+        String pvcName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUME_CLAIM_NAME,
+                String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(pvcName)) {
+            LOG.error("Get a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+            throw new IllegalArgumentException(
+                    "Get a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Get a specific Persistent Volume Claim require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Get a specific Persistent Volume Claim require specify a namespace name");
+        }
+        pvc = getEndpoint().getKubernetesClient().persistentVolumeClaims()
+                .inNamespace(namespaceName).withName(pvcName).get();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pvc);
+    }
+
+    protected void doCreatePersistentVolumeClaim(Exchange exchange,
+            String operation) throws Exception {
+        PersistentVolumeClaim pvc = null;
+        String pvcName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUME_CLAIM_NAME,
+                String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        PersistentVolumeClaimSpec pvcSpec = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUME_CLAIM_SPEC,
+                PersistentVolumeClaimSpec.class);
+        if (ObjectHelper.isEmpty(pvcName)) {
+            LOG.error("Create a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+            throw new IllegalArgumentException(
+                    "Create a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Create a specific Persistent Volume Claim require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Create a specific Persistent Volume Claim require specify a namespace name");
+        }
+        if (ObjectHelper.isEmpty(pvcSpec)) {
+            LOG.error("Create a specific Persistent Volume Claim require specify a Persistent Volume Claim spec bean");
+            throw new IllegalArgumentException(
+                    "Create a specific Persistent Volume Claim require specify a Persistent Volume Claim spec bean");
+        }
+        Map<String, String> labels = exchange
+                .getIn()
+                .getHeader(
+                        KubernetesConstants.KUBERNETES_PERSISTENT_VOLUMES_CLAIMS_LABELS,
+                        Map.class);
+        PersistentVolumeClaim pvcCreating = new PersistentVolumeClaimBuilder()
+                .withNewMetadata().withName(pvcName).withLabels(labels)
+                .endMetadata().withSpec(pvcSpec).build();
+        pvc = getEndpoint().getKubernetesClient().persistentVolumeClaims()
+                .inNamespace(namespaceName).create(pvcCreating);
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pvc);
+    }
+
+    protected void doDeletePersistentVolumeClaim(Exchange exchange,
+            String operation) throws Exception {
+        String pvcName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PERSISTENT_VOLUME_CLAIM_NAME,
+                String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(pvcName)) {
+            LOG.error("Delete a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+            throw new IllegalArgumentException(
+                    "Delete a specific Persistent Volume Claim require specify a Persistent Volume Claim name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Delete a specific Persistent Volume Claim require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Delete a specific Persistent Volume Claim require specify a namespace name");
+        }
+        boolean pvcDeleted = getEndpoint().getKubernetesClient()
+                .persistentVolumeClaims().inNamespace(namespaceName)
+                .withName(pvcName).delete();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pvcDeleted);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsComponent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsComponent.java
new file mode 100644
index 0000000..5d9d7d3
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsComponent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.pods;
+
+import org.apache.camel.component.kubernetes.AbstractKubernetesComponent;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+
+public class KubernetesPodsComponent extends AbstractKubernetesComponent {
+
+    protected KubernetesPodsEndpoint doCreateEndpoint(String uri, String remaining, KubernetesConfiguration config) throws Exception {
+        KubernetesPodsEndpoint endpoint = new KubernetesPodsEndpoint(uri, this, config);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
new file mode 100644
index 0000000..d73fb7e
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.pods;
+
+import java.util.concurrent.ExecutorService;
+
+import io.fabric8.kubernetes.api.model.DoneablePod;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.consumer.common.PodEvent;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesPodsConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesPodsConsumer.class);
+
+    private final Processor processor;
+    private ExecutorService executor;
+    private PodsConsumerTask podsWatcher;
+
+    public KubernetesPodsConsumer(AbstractKubernetesEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.processor = processor;
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = getEndpoint().createExecutor();
+        
+        podsWatcher = new PodsConsumerTask();
+        executor.submit(podsWatcher);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        LOG.debug("Stopping Kubernetes Pods Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+                if (podsWatcher != null) {
+                    podsWatcher.getWatch().close();
+                }
+                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                if (podsWatcher != null) {
+                    podsWatcher.getWatch().close();
+                }
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+
+    class PodsConsumerTask implements Runnable {
+
+        private Watch watch;
+        
+        @Override
+        public void run() {
+            MixedOperation<Pod, PodList, DoneablePod, PodResource<Pod, DoneablePod>> w = getEndpoint().getKubernetesClient().pods();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+                w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) 
+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+            }
+            watch = w.watch(new Watcher<Pod>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    Pod resource) {
+                    PodEvent pe = new PodEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(pe.getPod());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }
+
+                }
+            });
+        }
+
+        public Watch getWatch() {
+            return watch;
+        }
+
+        public void setWatch(Watch watch) {
+            this.watch = watch;
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsEndpoint.java
new file mode 100644
index 0000000..3564043
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsEndpoint.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.pods;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.build_configs.KubernetesBuildConfigsProducer;
+import org.apache.camel.component.kubernetes.builds.KubernetesBuildsProducer;
+import org.apache.camel.component.kubernetes.config_maps.KubernetesConfigMapsProducer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesConsumer;
+import org.apache.camel.component.kubernetes.namespaces.KubernetesNamespacesProducer;
+import org.apache.camel.component.kubernetes.nodes.KubernetesNodesConsumer;
+import org.apache.camel.component.kubernetes.nodes.KubernetesNodesProducer;
+import org.apache.camel.component.kubernetes.persistent_volumes.KubernetesPersistentVolumesProducer;
+import org.apache.camel.component.kubernetes.persistent_volumes_claims.KubernetesPersistentVolumesClaimsProducer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersConsumer;
+import org.apache.camel.component.kubernetes.replication_controllers.KubernetesReplicationControllersProducer;
+import org.apache.camel.component.kubernetes.resources_quota.KubernetesResourcesQuotaProducer;
+import org.apache.camel.component.kubernetes.secrets.KubernetesSecretsProducer;
+import org.apache.camel.component.kubernetes.service_accounts.KubernetesServiceAccountsProducer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesConsumer;
+import org.apache.camel.component.kubernetes.services.KubernetesServicesProducer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Kubernetes Pods component provides a producer to execute kubernetes pod operations
+ * and a consumer to consume pod events.
+ */
+@UriEndpoint(firstVersion = "2.17.0", scheme = "kubernetes-pods", title = "Kubernetes Pods",
+    syntax = "kubernetes-pods:masterUrl", consumerClass = KubernetesPodsConsumer.class, label = "container,cloud,paas")
+public class KubernetesPodsEndpoint extends AbstractKubernetesEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesPodsEndpoint.class);
+
+    public KubernetesPodsEndpoint(String uri, KubernetesPodsComponent component, KubernetesConfiguration config) {
+        super(uri, component, config);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KubernetesPodsProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new KubernetesPodsConsumer(this, processor);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsProducer.java
new file mode 100644
index 0000000..61ed679
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsProducer.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.pods;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.DoneablePod;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesOperations;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesPodsProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KubernetesPodsProducer.class);
+
+    public KubernetesPodsProducer(AbstractKubernetesEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String operation;
+
+        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration()
+                .getOperation())) {
+            operation = exchange.getIn().getHeader(
+                    KubernetesConstants.KUBERNETES_OPERATION, String.class);
+        } else {
+            operation = getEndpoint().getKubernetesConfiguration()
+                    .getOperation();
+        }
+
+        switch (operation) {
+
+        case KubernetesOperations.LIST_PODS_OPERATION:
+            doList(exchange, operation);
+            break;
+
+        case KubernetesOperations.LIST_PODS_BY_LABELS_OPERATION:
+            doListPodsByLabel(exchange, operation);
+            break;
+
+        case KubernetesOperations.GET_POD_OPERATION:
+            doGetPod(exchange, operation);
+            break;
+
+        case KubernetesOperations.CREATE_POD_OPERATION:
+            doCreatePod(exchange, operation);
+            break;
+
+        case KubernetesOperations.DELETE_POD_OPERATION:
+            doDeletePod(exchange, operation);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unsupported operation "
+                    + operation);
+        }
+    }
+
+    protected void doList(Exchange exchange, String operation) throws Exception {
+        PodList podList = getEndpoint().getKubernetesClient().pods().list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(podList.getItems());
+    }
+
+    protected void doListPodsByLabel(Exchange exchange, String operation) {
+        Map<String, String> labels = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PODS_LABELS, Map.class);
+        if (ObjectHelper.isEmpty(labels)) {
+            LOG.error("Get pods by labels require specify a labels set");
+            throw new IllegalArgumentException(
+                    "Get pods by labels require specify a labels set");
+        }
+        
+        MixedOperation<Pod, PodList, DoneablePod, PodResource<Pod, DoneablePod>> pods = getEndpoint().getKubernetesClient().pods();
+        for (Map.Entry<String, String> entry : labels.entrySet()) {
+            pods.withLabel(entry.getKey(), entry.getValue());
+        }
+        PodList podList = pods.list();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(podList.getItems());
+    }
+
+    protected void doGetPod(Exchange exchange, String operation)
+            throws Exception {
+        Pod pod = null;
+        String podName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_POD_NAME, String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(podName)) {
+            LOG.error("Get a specific pod require specify a pod name");
+            throw new IllegalArgumentException(
+                    "Get a specific pod require specify a pod name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Get a specific pod require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Get a specific pod require specify a namespace name");
+        }
+        pod = getEndpoint().getKubernetesClient().pods()
+                .inNamespace(namespaceName).withName(podName).get();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pod);
+    }
+
+    protected void doCreatePod(Exchange exchange, String operation)
+            throws Exception {
+        Pod pod = null;
+        String podName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_POD_NAME, String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        PodSpec podSpec = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_POD_SPEC, PodSpec.class);
+        if (ObjectHelper.isEmpty(podName)) {
+            LOG.error("Create a specific pod require specify a pod name");
+            throw new IllegalArgumentException(
+                    "Create a specific pod require specify a pod name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Create a specific pod require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Create a specific pod require specify a namespace name");
+        }
+        if (ObjectHelper.isEmpty(podSpec)) {
+            LOG.error("Create a specific pod require specify a pod spec bean");
+            throw new IllegalArgumentException(
+                    "Create a specific pod require specify a pod spec bean");
+        }
+        Map<String, String> labels = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_PODS_LABELS, Map.class);
+        Pod podCreating = new PodBuilder().withNewMetadata()
+                .withName(podName).withLabels(labels).endMetadata()
+                .withSpec(podSpec).build();
+        pod = getEndpoint().getKubernetesClient().pods()
+                .inNamespace(namespaceName).create(podCreating);
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(pod);
+    }
+
+    protected void doDeletePod(Exchange exchange, String operation)
+            throws Exception {
+        String podName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_POD_NAME, String.class);
+        String namespaceName = exchange.getIn().getHeader(
+                KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
+        if (ObjectHelper.isEmpty(podName)) {
+            LOG.error("Delete a specific pod require specify a pod name");
+            throw new IllegalArgumentException(
+                    "Delete a specific pod require specify a pod name");
+        }
+        if (ObjectHelper.isEmpty(namespaceName)) {
+            LOG.error("Delete a specific pod require specify a namespace name");
+            throw new IllegalArgumentException(
+                    "Delete a specific pod require specify a namespace name");
+        }
+        boolean podDeleted = getEndpoint().getKubernetesClient().pods()
+                .inNamespace(namespaceName).withName(podName).delete();
+        
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
+        exchange.getOut().setBody(podDeleted);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/89eee3b9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java
deleted file mode 100644
index ab77529..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.producer;
-
-import java.util.Map;
-
-import io.fabric8.kubernetes.client.dsl.MixedOperation;
-import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
-import io.fabric8.openshift.api.model.Build;
-import io.fabric8.openshift.api.model.BuildConfig;
-import io.fabric8.openshift.api.model.BuildConfigList;
-import io.fabric8.openshift.api.model.DoneableBuildConfig;
-import io.fabric8.openshift.client.OpenShiftClient;
-import io.fabric8.openshift.client.dsl.BuildConfigResource;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.kubernetes.KubernetesConstants;
-import org.apache.camel.component.kubernetes.KubernetesEndpoint;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.MessageHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KubernetesBuildConfigsProducer extends DefaultProducer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesBuildConfigsProducer.class);
-
-    public KubernetesBuildConfigsProducer(KubernetesEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public KubernetesEndpoint getEndpoint() {
-        return (KubernetesEndpoint) super.getEndpoint();
-    }
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        String operation;
-
-        if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration().getOperation())) {
-            operation = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_OPERATION, String.class);
-        } else {
-            operation = getEndpoint().getKubernetesConfiguration().getOperation();
-        }
-
-        switch (operation) {
-
-        case KubernetesOperations.LIST_BUILD_CONFIGS:
-            doList(exchange, operation);
-            break;
-
-        case KubernetesOperations.LIST_BUILD_CONFIGS_BY_LABELS_OPERATION:
-            doListBuildConfigsByLabels(exchange, operation);
-            break;
-
-        case KubernetesOperations.GET_BUILD_CONFIG_OPERATION:
-            doGetBuildConfig(exchange, operation);
-            break;
-
-        default:
-            throw new IllegalArgumentException("Unsupported operation " + operation);
-        }
-    }
-
-    protected void doList(Exchange exchange, String operation) throws Exception {
-        BuildConfigList buildConfigsList = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class)
-                .buildConfigs().list();
-        exchange.getOut().setBody(buildConfigsList.getItems());
-    }
-
-    protected void doListBuildConfigsByLabels(Exchange exchange, String operation) throws Exception {
-        BuildConfigList buildConfigsList = null;
-        Map<String, String> labels = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILD_CONFIGS_LABELS,
-                Map.class);
-        String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
-        if (!ObjectHelper.isEmpty(namespaceName)) {
-            NonNamespaceOperation<BuildConfig, BuildConfigList, DoneableBuildConfig, 
-                BuildConfigResource<BuildConfig, DoneableBuildConfig, Void, Build>> buildConfigs; 
-            buildConfigs = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs()
-                    .inNamespace(namespaceName);
-            for (Map.Entry<String, String> entry : labels.entrySet()) {
-                buildConfigs.withLabel(entry.getKey(), entry.getValue());
-            }
-            buildConfigsList = buildConfigs.list();
-        } else {
-            MixedOperation<BuildConfig, BuildConfigList, DoneableBuildConfig, 
-                BuildConfigResource<BuildConfig, DoneableBuildConfig, Void, Build>> buildConfigs; 
-            buildConfigs = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs();
-            for (Map.Entry<String, String> entry : labels.entrySet()) {
-                buildConfigs.withLabel(entry.getKey(), entry.getValue());
-            }
-            buildConfigsList = buildConfigs.list();
-        }
-        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
-        exchange.getOut().setBody(buildConfigsList.getItems());
-    }
-
-    protected void doGetBuildConfig(Exchange exchange, String operation) throws Exception {
-        BuildConfig buildConfig = null;
-        String buildConfigName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILD_CONFIG_NAME,
-                String.class);
-        String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class);
-        if (ObjectHelper.isEmpty(buildConfigName)) {
-            LOG.error("Get a specific Build Config require specify a Build Config name");
-            throw new IllegalArgumentException("Get a specific Build Config require specify a Build Config name");
-        }
-        if (ObjectHelper.isEmpty(namespaceName)) {
-            LOG.error("Get a specific Build Config require specify a namespace name");
-            throw new IllegalArgumentException("Get a specific Build Config require specify a namespace name");
-        }
-        buildConfig = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs()
-                .inNamespace(namespaceName).withName(buildConfigName).get();
-        
-        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), true);
-        exchange.getOut().setBody(buildConfig);
-    }
-}


Mime
View raw message