Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8CE7F200B31 for ; Tue, 24 May 2016 14:45:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8B932160A36; Tue, 24 May 2016 12:45:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4875F160A35 for ; Tue, 24 May 2016 14:45:37 +0200 (CEST) Received: (qmail 92654 invoked by uid 500); 24 May 2016 12:45:36 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 92636 invoked by uid 99); 24 May 2016 12:45:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 12:45:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40D1FDFF32; Tue, 24 May 2016 12:45:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 24 May 2016 12:45:37 -0000 Message-Id: <647db5f3cf664476a1600f6013f6606a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] camel git commit: CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers. Added camel-ribbon as impleme archived-at: Tue, 24 May 2016 12:45:39 -0000 http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java new file mode 100644 index 0000000..f1af317 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.Collection; +import java.util.concurrent.RejectedExecutionException; + +import io.fabric8.kubernetes.client.AutoAdaptableKubernetesClient; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Traceable; +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kubernetes based implementation of the the ServiceCall EIP where the service lookup is client based. + */ +public class KubernetesClientServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientServiceCallProcessor.class); + + private CamelContext camelContext; + private String id; + private final String name; + private final String scheme; + private final String contextPath; + private final String namespace; + private final String uri; + private final ExchangePattern exchangePattern; + private final KubernetesConfiguration configuration; + private final KubernetesServiceCallExpression serviceCallExpression; + private ServiceCallServerListStrategy serverListStrategy; + private ServiceCallLoadBalancer loadBalancer; + private SendDynamicProcessor processor; + + public KubernetesClientServiceCallProcessor(String name, String namespace, String scheme, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) { + // setup from the provided name which can contain scheme and context-path information as well + String serviceName; + if (name.contains("/")) { + serviceName = ObjectHelper.before(name, "/"); + this.contextPath = ObjectHelper.after(name, "/"); + } else if (name.contains("?")) { + serviceName = ObjectHelper.before(name, "?"); + this.contextPath = ObjectHelper.after(name, "?"); + } else { + serviceName = name; + this.contextPath = null; + } + if (serviceName.contains(":")) { + this.scheme = ObjectHelper.before(serviceName, ":"); + this.name = ObjectHelper.after(serviceName, ":"); + } else { + this.scheme = scheme; + this.name = serviceName; + } + + // if no namespace configured then resolve from environment variables + if (namespace == null) { + this.namespace = System.getenv("KUBERNETES_NAMESPACE"); + } else { + this.namespace = namespace; + } + this.uri = uri; + this.exchangePattern = exchangePattern; + this.configuration = configuration; + this.serviceCallExpression = new KubernetesServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Collection servers = null; + try { + servers = serverListStrategy.getUpdatedListOfServers(); + if (servers == null || servers.isEmpty()) { + exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); + } + } catch (Throwable e) { + exchange.setException(e); + } + + if (exchange.getException() != null) { + callback.done(true); + return true; + } + + // let the client load balancer chose which server to use + KubernetesServer server = loadBalancer.chooseServer(servers); + String ip = server.getIp(); + int port = server.getPort(); + LOG.debug("Service {} active at server: {}:{}", name, ip, port); + + // set selected server as header + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port); + + // use the dynamic send processor to call the service + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getTraceLabel() { + return "kubernetes"; + } + + public ServiceCallLoadBalancer getLoadBalancer() { + return loadBalancer; + } + + public void setLoadBalancer(ServiceCallLoadBalancer loadBalancer) { + this.loadBalancer = loadBalancer; + } + + public ServiceCallServerListStrategy getServerListStrategy() { + return serverListStrategy; + } + + public void setServerListStrategy(ServiceCallServerListStrategy serverListStrategy) { + this.serverListStrategy = serverListStrategy; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notEmpty(name, "name", this); + ObjectHelper.notEmpty(namespace, "namespace", this); + ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this); + + if (loadBalancer == null) { + loadBalancer = new RandomLoadBalancer(); + } + if (serverListStrategy == null) { + serverListStrategy = new KubernetesServiceCallServerListStrategy(name, namespace, null, createKubernetesClient()); + } + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} is using load balancer: {} and service discovery: {}", namespace, name, loadBalancer, serverListStrategy); + + processor = new SendDynamicProcessor(uri, serviceCallExpression); + processor.setCamelContext(getCamelContext()); + if (exchangePattern != null) { + processor.setPattern(exchangePattern); + } + ServiceHelper.startServices(serverListStrategy, processor); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(processor, serverListStrategy); + } + + private AutoAdaptableKubernetesClient createKubernetesClient() { + LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); + + ConfigBuilder builder = new ConfigBuilder(); + builder.withMasterUrl(configuration.getMasterUrl()); + if ((ObjectHelper.isNotEmpty(configuration.getUsername()) + && ObjectHelper.isNotEmpty(configuration.getPassword())) + && ObjectHelper.isEmpty(configuration.getOauthToken())) { + builder.withUsername(configuration.getUsername()); + builder.withPassword(configuration.getPassword()); + } else { + builder.withOauthToken(configuration.getOauthToken()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) { + builder.withCaCertData(configuration.getCaCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) { + builder.withCaCertFile(configuration.getCaCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) { + builder.withClientCertData(configuration.getClientCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) { + builder.withClientCertFile(configuration.getClientCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) { + builder.withApiVersion(configuration.getApiVersion()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) { + builder.withClientKeyAlgo(configuration.getClientKeyAlgo()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) { + builder.withClientKeyData(configuration.getClientKeyData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) { + builder.withClientKeyFile(configuration.getClientKeyFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) { + builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase()); + } + if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) { + builder.withTrustCerts(configuration.getTrustCerts()); + } + + Config conf = builder.build(); + return new AutoAdaptableKubernetesClient(conf); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java new file mode 100644 index 0000000..d5e3751 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.support.ExpressionAdapter; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesDnsServiceCallExpression extends ExpressionAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesDnsServiceCallExpression.class); + + private final String name; + private final String namespace; + private final String scheme; + private final String contextPath; + private final String uri; + private final String dnsDomain; + + public KubernetesDnsServiceCallExpression(String name, String namespace, String scheme, String contextPath, String uri, String dnsDomain) { + this.name = name; + this.namespace = namespace; + this.scheme = scheme; + this.contextPath = contextPath; + this.uri = uri; + this.dnsDomain = dnsDomain; + } + + @Override + public Object evaluate(Exchange exchange) { + try { + return buildCamelEndpointUri(name, namespace, uri, contextPath, scheme, dnsDomain); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + protected static String buildCamelEndpointUri(String name, String namespace, String uri, String contextPath, String scheme, String dnsDomain) { + // build basic uri if none provided + String answer = uri; + if (answer == null) { + if (scheme == null) { + // use http by default if no scheme has been configured + scheme = "http"; + } + answer = scheme + "://" + asKubernetesDnsServicePart(name, namespace, dnsDomain); + if (contextPath != null) { + answer += "/" + contextPath; + } + } else { + // we have existing uri, then replace the serviceName with name.namespace.svc.dnsDomain + if (answer.contains(name)) { + answer = answer.replaceFirst(name, asKubernetesDnsServicePart(name, namespace, dnsDomain)); + } + } + + LOG.debug("Camel endpoint uri: {} for calling service: {}", answer, name); + return answer; + } + + protected static String asKubernetesDnsServicePart(String name, String namespace, String dnsDomain) { + return name + "." + namespace + ".svc." + dnsDomain; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java new file mode 100644 index 0000000..8e66086 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Traceable; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.IdAware; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kubernetes based implementation of the the ServiceCall EIP where the service lookup is environment variable based. + */ +public class KubernetesDnsServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientServiceCallProcessor.class); + + private CamelContext camelContext; + private String id; + private final String name; + private final String scheme; + private final String contextPath; + private final String namespace; + private final String uri; + private final String dnsDomain; + private final ExchangePattern exchangePattern; + private final KubernetesServiceCallExpression serviceCallExpression; + private SendDynamicProcessor processor; + private String ip; + private long port; + + public KubernetesDnsServiceCallProcessor(String name, String namespace, String scheme, String uri, ExchangePattern exchangePattern, String dnsDomain) { + // setup from the provided name which can contain scheme and context-path information as well + String serviceName; + if (name.contains("/")) { + serviceName = ObjectHelper.before(name, "/"); + this.contextPath = ObjectHelper.after(name, "/"); + } else if (name.contains("?")) { + serviceName = ObjectHelper.before(name, "?"); + this.contextPath = ObjectHelper.after(name, "?"); + } else { + serviceName = name; + this.contextPath = null; + } + if (serviceName.contains(":")) { + this.scheme = ObjectHelper.before(serviceName, ":"); + this.name = ObjectHelper.after(serviceName, ":"); + } else { + this.scheme = scheme; + this.name = serviceName; + } + + // if no namespace configured then resolve from environment variables + if (namespace == null) { + this.namespace = System.getenv("KUBERNETES_NAMESPACE"); + } else { + this.namespace = namespace; + } + this.uri = uri; + this.exchangePattern = exchangePattern; + this.dnsDomain = dnsDomain; + this.serviceCallExpression = new KubernetesServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // use a + + LOG.debug("Service {} active at server: {}:{}", name, ip, port); + + // set selected server as header + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port); + + // use the dynamic send processor to call the service + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getTraceLabel() { + return "kubernetes"; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notEmpty(name, "name", this); + ObjectHelper.notEmpty(namespace, "namespace", this); + ObjectHelper.notEmpty(dnsDomain, "dnsDomain", this); + + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} using DNS domain {}", namespace, name, dnsDomain); + + processor = new SendDynamicProcessor(uri, serviceCallExpression); + processor.setCamelContext(getCamelContext()); + if (exchangePattern != null) { + processor.setPattern(exchangePattern); + } + ServiceHelper.startServices(processor); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(processor); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesEnvironmentServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesEnvironmentServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesEnvironmentServiceCallProcessor.java new file mode 100644 index 0000000..378f19a --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesEnvironmentServiceCallProcessor.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Traceable; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.IdAware; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kubernetes based implementation of the the ServiceCall EIP where the service lookup is environment variable based. + */ +public class KubernetesEnvironmentServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientServiceCallProcessor.class); + + private CamelContext camelContext; + private String id; + private final String name; + private final String scheme; + private final String contextPath; + private final String namespace; + private final String uri; + private final ExchangePattern exchangePattern; + private final KubernetesServiceCallExpression serviceCallExpression; + private SendDynamicProcessor processor; + private String ip; + private long port; + + public KubernetesEnvironmentServiceCallProcessor(String name, String namespace, String scheme, String uri, ExchangePattern exchangePattern) { + // setup from the provided name which can contain scheme and context-path information as well + String serviceName; + if (name.contains("/")) { + serviceName = ObjectHelper.before(name, "/"); + this.contextPath = ObjectHelper.after(name, "/"); + } else if (name.contains("?")) { + serviceName = ObjectHelper.before(name, "?"); + this.contextPath = ObjectHelper.after(name, "?"); + } else { + serviceName = name; + this.contextPath = null; + } + if (serviceName.contains(":")) { + this.scheme = ObjectHelper.before(serviceName, ":"); + this.name = ObjectHelper.after(serviceName, ":"); + } else { + this.scheme = scheme; + this.name = serviceName; + } + + // if no namespace configured then resolve from environment variables + if (namespace == null) { + this.namespace = System.getenv("KUBERNETES_NAMESPACE"); + } else { + this.namespace = namespace; + } + this.uri = uri; + this.exchangePattern = exchangePattern; + this.serviceCallExpression = new KubernetesServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (ip == null) { + exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); + } + if (exchange.getException() != null) { + callback.done(true); + return true; + } + + LOG.debug("Service {} active at server: {}:{}", name, ip, port); + + // set selected server as header + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port); + + // use the dynamic send processor to call the service + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getTraceLabel() { + return "kubernetes"; + } + + @Override + protected void doStart() throws Exception { + // lookup ENV on startup + ip = getCamelContext().resolvePropertyPlaceholders("{{service.host:" + name + "}}"); + String num = getCamelContext().resolvePropertyPlaceholders("{{service.port:" + name + "}}"); + port = getCamelContext().getTypeConverter().tryConvertTo(long.class, num); + + ObjectHelper.notEmpty(name, "name", this); + ObjectHelper.notEmpty(namespace, "namespace", this); + + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} using {}:{}", namespace, name, ip, port); + + processor = new SendDynamicProcessor(uri, serviceCallExpression); + processor.setCamelContext(getCamelContext()); + if (exchangePattern != null) { + processor.setPattern(exchangePattern); + } + ServiceHelper.startServices(processor); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(processor); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java new file mode 100644 index 0000000..8c1324c --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.remote.KubernetesConfigurationDefinition; +import org.apache.camel.model.remote.ServiceCallConfigurationDefinition; +import org.apache.camel.model.remote.ServiceCallDefinition; +import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.IntrospectionSupport; + +/** + * {@link ProcessorFactory} that creates the Kubernetes implementation of the ServiceCall EIP. + */ +public class KubernetesProcessorFactory implements ProcessorFactory { + + @Override + public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition definition, boolean mandatory) throws Exception { + // not in use + return null; + } + + @Override + @SuppressWarnings("unchecked") + public Processor createProcessor(RouteContext routeContext, ProcessorDefinition definition) throws Exception { + if (definition instanceof ServiceCallDefinition) { + ServiceCallDefinition sc = (ServiceCallDefinition) definition; + + String name = sc.getName(); + String uri = sc.getUri(); + ExchangePattern mep = sc.getPattern(); + + KubernetesConfigurationDefinition config = (KubernetesConfigurationDefinition) sc.getServiceCallConfiguration(); + KubernetesConfigurationDefinition configRef = null; + if (sc.getServiceCallConfigurationRef() != null) { + // lookup in registry first + configRef = CamelContextHelper.lookup(routeContext.getCamelContext(), sc.getServiceCallConfigurationRef(), KubernetesConfigurationDefinition.class); + if (configRef == null) { + // and fallback as service configuration + routeContext.getCamelContext().getServiceCallConfiguration(sc.getServiceCallConfigurationRef(), KubernetesConfigurationDefinition.class); + } + } + + // if no configuration explicit configured then use default + if (config == null && configRef == null) { + config = routeContext.getCamelContext().getServiceCallConfiguration(null, KubernetesConfigurationDefinition.class); + } + if (config == null) { + // if no default then try to find if there configuration in the registry of the given type + Set set = routeContext.getCamelContext().getRegistry().findByType(KubernetesConfigurationDefinition.class); + if (set.size() == 1) { + config = set.iterator().next(); + } + } + + if (config == null && configRef == null) { + throw new IllegalStateException("The ServiceCall: " + definition + " must be configured before it can be used."); + } + + // extract the properties from the configuration from the model + Map parameters = new HashMap<>(); + if (configRef != null) { + IntrospectionSupport.getProperties(configRef, parameters, null); + } + if (config != null) { + IntrospectionSupport.getProperties(config, parameters, null); + } + // and set them on the kubernetes configuration class + KubernetesConfiguration kc = new KubernetesConfiguration(); + IntrospectionSupport.setProperties(kc, parameters); + + // use namespace from config + String namespace = kc.getNamespace(); + + // lookup the load balancer to use (configured on EIP takes precedence vs configured on configuration) + ServiceCallLoadBalancer lb = configureLoadBalancer(routeContext, sc); + if (lb == null && config != null) { + lb = configureLoadBalancer(routeContext, config); + } + if (lb == null && configRef != null) { + lb = configureLoadBalancer(routeContext, configRef); + } + + // lookup the server list strategy to use (configured on EIP takes precedence vs configured on configuration) + ServiceCallServerListStrategy sl = configureServerListStrategy(routeContext, sc); + if (sl == null && config != null) { + sl = configureServerListStrategy(routeContext, config); + } + if (sl == null && configRef != null) { + sl = configureServerListStrategy(routeContext, configRef); + } + + String lookup = config != null ? config.getLookup() : null; + if (lookup == null && configRef != null) { + lookup = configRef.getLookup(); + } + + // the component is used to configure what the default scheme to use (eg camel component name) + String component = config != null ? config.getComponent() : null; + if (component == null && configRef != null) { + component = configRef.getComponent(); + } + + if ("client".equals(lookup)) { + KubernetesClientServiceCallProcessor processor = new KubernetesClientServiceCallProcessor(name, namespace, component, uri, mep, kc); + processor.setLoadBalancer(lb); + processor.setServerListStrategy(sl); + return processor; + } else if ("dns".equals(lookup)) { + String dnsDomain = config != null ? config.getDnsDomain() : null; + if (dnsDomain == null && configRef != null) { + dnsDomain = configRef.getDnsDomain(); + } + return new KubernetesDnsServiceCallProcessor(name, namespace, component, uri, mep, dnsDomain); + } else { + // environment is default + return new KubernetesEnvironmentServiceCallProcessor(name, namespace, component, uri, mep); + } + } else { + return null; + } + } + + private ServiceCallLoadBalancer configureLoadBalancer(RouteContext routeContext, ServiceCallDefinition sd) { + ServiceCallLoadBalancer lb = null; + + if (sd != null) { + lb = sd.getLoadBalancer(); + if (lb == null && sd.getLoadBalancerRef() != null) { + String ref = sd.getLoadBalancerRef(); + // special for ref is referring to built-in + if ("random".equalsIgnoreCase(ref)) { + lb = new RandomLoadBalancer(); + } else if ("roundrobin".equalsIgnoreCase(ref)) { + lb = new RoundRobinBalancer(); + } else { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallLoadBalancer.class); + } + } + } + + return lb; + } + + private ServiceCallLoadBalancer configureLoadBalancer(RouteContext routeContext, ServiceCallConfigurationDefinition config) { + ServiceCallLoadBalancer lb = config.getLoadBalancer(); + if (lb == null && config.getLoadBalancerRef() != null) { + String ref = config.getLoadBalancerRef(); + // special for ref is referring to built-in + if ("random".equalsIgnoreCase(ref)) { + lb = new RandomLoadBalancer(); + } else if ("roundrobin".equalsIgnoreCase(ref)) { + lb = new RoundRobinBalancer(); + } else { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallLoadBalancer.class); + } + } + return lb; + } + + private ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallDefinition sd) { + ServiceCallServerListStrategy lb = null; + + if (sd != null) { + lb = sd.getServerListStrategy(); + if (lb == null && sd.getServerListStrategyRef() != null) { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), sd.getServerListStrategyRef(), ServiceCallServerListStrategy.class); + } + } + + return lb; + } + + private ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallConfigurationDefinition config) { + ServiceCallServerListStrategy lb = config.getServerListStrategy(); + if (lb == null && config.getServerListStrategyRef() != null) { + String ref = config.getServerListStrategyRef(); + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallServerListStrategy.class); + } + return lb; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java new file mode 100644 index 0000000..007e5c8 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.spi.ServiceCallServer; + +/** + * Represents a model of a kubernetes server. + */ +public final class KubernetesServer implements ServiceCallServer { + + private final String ip; + private final int port; + + public KubernetesServer(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallExpression.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallExpression.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallExpression.java new file mode 100644 index 0000000..dcb770f --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallExpression.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.support.ServiceCallExpressionSupport; +import org.apache.camel.util.ExchangeHelper; + +public class KubernetesServiceCallExpression extends ServiceCallExpressionSupport { + + public KubernetesServiceCallExpression(String name, String scheme, String contextPath, String uri) { + super(name, scheme, contextPath, uri); + } + + @Override + public String getIp(Exchange exchange) throws Exception { + return ExchangeHelper.getMandatoryHeader(exchange, KubernetesConstants.KUBERNETES_SERVER_IP, String.class); + } + + @Override + public int getPort(Exchange exchange) throws Exception { + return ExchangeHelper.getMandatoryHeader(exchange, KubernetesConstants.KUBERNETES_SERVER_PORT, int.class); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java new file mode 100644 index 0000000..f35cfed --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import io.fabric8.kubernetes.api.model.EndpointAddress; +import io.fabric8.kubernetes.api.model.EndpointPort; +import io.fabric8.kubernetes.api.model.EndpointSubset; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.client.AutoAdaptableKubernetesClient; +import org.apache.camel.spi.ServiceCallServerListStrategy; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Discovers where services are running on which servers in Kubernetes. + */ +public class KubernetesServiceCallServerListStrategy extends ServiceSupport implements ServiceCallServerListStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallServerListStrategy.class); + private static final int FIRST = 0; + + private String name; + private String namespace; + private String portName; + private AutoAdaptableKubernetesClient client; + + public KubernetesServiceCallServerListStrategy(String name, String namespace, String portName, AutoAdaptableKubernetesClient client) { + this.name = name; + this.namespace = namespace; + this.portName = portName; + this.client = client; + } + + @Override + @SuppressWarnings("unchecked") + public Collection getInitialListOfServers() { + return Collections.EMPTY_LIST; + } + + public Collection getUpdatedListOfServers() { + LOG.debug("Discovering endpoints from namespace: {} with name: {}", namespace, name); + Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); + List result = new ArrayList<>(); + if (endpoints != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found {} endpoints in namespace: {} for name: {} and portName: {}", endpoints.getSubsets().size(), namespace, name, portName); + } + for (EndpointSubset subset : endpoints.getSubsets()) { + if (subset.getPorts().size() == 1) { + EndpointPort port = subset.getPorts().get(FIRST); + for (EndpointAddress address : subset.getAddresses()) { + result.add(new KubernetesServer(address.getIp(), port.getPort())); + } + } else { + for (EndpointPort port : subset.getPorts()) { + if (ObjectHelper.isEmpty(portName) || portName.endsWith(port.getName())) { + for (EndpointAddress address : subset.getAddresses()) { + result.add(new KubernetesServer(address.getIp(), port.getPort())); + } + } + } + } + } + } + + return result; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + IOHelper.close(client); + } + } + + public String toString() { + return "KubernetesServiceDiscovery"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java new file mode 100644 index 0000000..1b55e75 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.camel.spi.ServiceCallLoadBalancer; + +public class RandomLoadBalancer implements ServiceCallLoadBalancer { + + @Override + public KubernetesServer chooseServer(Collection servers) { + List list = new ArrayList<>(servers); + int size = list.size(); + int ran = new Random().nextInt(size); + return list.get(ran); + } + + @Override + public String toString() { + return "RandomLoadBalancer"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java new file mode 100644 index 0000000..1cfa86d --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.camel.spi.ServiceCallLoadBalancer; + +public class RoundRobinBalancer implements ServiceCallLoadBalancer { + + private int counter = -1; + + @Override + public KubernetesServer chooseServer(Collection servers) { + List list = new ArrayList<>(servers); + + int size = list.size(); + if (++counter >= size) { + counter = 0; + } + return list.get(counter); + } + + @Override + public String toString() { + return "RoundRobinBalancer"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition new file mode 100644 index 0000000..acf5be8 --- /dev/null +++ b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.kubernetes.processor.KubernetesProcessorFactory http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallClientRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallClientRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallClientRouteTest.java new file mode 100644 index 0000000..b5e03e3 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallClientRouteTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.remote.KubernetesConfigurationDefinition; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Manual test") +public class ServiceCallClientRouteTest extends CamelTestSupport { + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + KubernetesConfigurationDefinition config = new KubernetesConfigurationDefinition(); + config.setMasterUrl("http://172.28.128.80:8080"); + config.setUsername("admin"); + config.setPassword("admin"); + config.setNamespace("default"); + config.setLookup("client"); + // lets use the built-in round robin (random is default) + config.setLoadBalancerRef("roundrobin"); + + // register configuration + context.setServiceCallConfiguration(config); + + from("direct:start") + .serviceCall("cdi-camel-jetty") + .serviceCall("cdi-camel-jetty") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallEnvironmentRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallEnvironmentRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallEnvironmentRouteTest.java new file mode 100644 index 0000000..9278b57 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallEnvironmentRouteTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.model.remote.KubernetesConfigurationDefinition; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Manual test") +public class ServiceCallEnvironmentRouteTest extends CamelTestSupport { + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + KubernetesConfigurationDefinition config = new KubernetesConfigurationDefinition(); + config.setLookup("environment"); + + // register configuration + context.setServiceCallConfiguration(config); + + from("direct:start") + .serviceCall("cdi-camel-jetty") + .serviceCall("cdi-camel-jetty") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.java new file mode 100644 index 0000000..3e8fcba3 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +@Ignore("Manual test") +public class SpringServiceCallClientRouteTest extends CamelSpringTestSupport { + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.xml"); + } + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.java new file mode 100644 index 0000000..bd09c31 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +@Ignore("Manual test") +public class SpringServiceCallEnvironmentRouteTest extends CamelSpringTestSupport { + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.xml"); + } + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/resources/log4j.properties b/components/camel-kubernetes/src/test/resources/log4j.properties index 255fe5c..f88e05c 100644 --- a/components/camel-kubernetes/src/test/resources/log4j.properties +++ b/components/camel-kubernetes/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=INFO, file -#log4j.logger.org.apache.camel=DEBUG +log4j.logger.org.apache.camel.component.kubernetes=DEBUG # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.xml b/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.xml new file mode 100644 index 0000000..87162d9 --- /dev/null +++ b/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallClientRouteTest.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.xml b/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.xml new file mode 100644 index 0000000..6f2f028 --- /dev/null +++ b/components/camel-kubernetes/src/test/resources/org/apache/camel/component/kubernetes/processor/SpringServiceCallEnvironmentRouteTest.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-ribbon/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/pom.xml b/components/camel-ribbon/pom.xml new file mode 100644 index 0000000..94f68fe --- /dev/null +++ b/components/camel-ribbon/pom.xml @@ -0,0 +1,89 @@ + + + + + 4.0.0 + + + org.apache.camel + components + 2.18-SNAPSHOT + + + camel-ribbon + jar + Camel :: Ribbon + + + org.apache.camel.component.ribbon.* + + + + + + org.apache.camel + camel-core + + + + com.netflix.ribbon + ribbon-core + ${ribbon-version} + + + com.netflix.ribbon + ribbon-loadbalancer + ${ribbon-version} + + + + + org.apache.camel + camel-test-spring + test + + + org.apache.camel + camel-http + test + + + org.apache.camel + camel-jetty + test + + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + test + + + log4j + log4j + test + + + + http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConfiguration.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConfiguration.java new file mode 100644 index 0000000..4931a5b --- /dev/null +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConfiguration.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ribbon; + +import com.netflix.loadbalancer.IPing; +import com.netflix.loadbalancer.IRule; + +public class RibbonConfiguration { + + private String namespace; + private String username; + private String password; + private IRule rule; + private IPing ping; + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public IRule getRule() { + return rule; + } + + public void setRule(IRule rule) { + this.rule = rule; + } + + public IPing getPing() { + return ping; + } + + public void setPing(IPing ping) { + this.ping = ping; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4b81b4ae/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConstants.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConstants.java new file mode 100644 index 0000000..705d69f --- /dev/null +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/RibbonConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ribbon; + +public interface RibbonConstants { + + // Service Call EIP + String RIBBON_SERVER_IP = "CamelRibbonServerIp"; + String RIBBON_SERVER_PORT = "CamelRibbonServerPort"; + +}