Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D74318D8E for ; Fri, 11 Dec 2015 11:55:37 +0000 (UTC) Received: (qmail 22188 invoked by uid 500); 11 Dec 2015 11:55:37 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 22137 invoked by uid 500); 11 Dec 2015 11:55:36 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 22126 invoked by uid 99); 11 Dec 2015 11:55:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 11:55:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A5B1ADFCE0; Fri, 11 Dec 2015 11:55:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hekonsek@apache.org To: commits@camel.apache.org Message-Id: <48a033f006694b4cac428c015243a6b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: camel git commit: Renamed TypedRddCallback to ConvertingRddCallback. Date: Fri, 11 Dec 2015 11:55:36 +0000 (UTC) Repository: camel Updated Branches: refs/heads/master 930117ca6 -> be05f73e0 Renamed TypedRddCallback to ConvertingRddCallback. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/be05f73e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/be05f73e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/be05f73e Branch: refs/heads/master Commit: be05f73e0b14ca5767330c1dac4dbf73cdb8dc26 Parents: 930117c Author: Henryk Konsek Authored: Fri Dec 11 12:55:19 2015 +0100 Committer: Henryk Konsek Committed: Fri Dec 11 12:55:32 2015 +0100 ---------------------------------------------------------------------- .../component/spark/ConvertingRddCallback.java | 49 ++++++++++++++++++++ .../component/spark/SparkTransformation.java | 23 --------- .../camel/component/spark/TypedRddCallback.java | 43 ----------------- .../component/spark/SparkProducerTest.java | 2 +- 4 files changed, 50 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/be05f73e/components/camel-spark/src/main/java/org/apache/camel/component/spark/ConvertingRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/ConvertingRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/ConvertingRddCallback.java new file mode 100644 index 0000000..ac75f38 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/ConvertingRddCallback.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import static java.lang.String.format; + +import org.apache.camel.CamelContext; +import org.apache.spark.api.java.AbstractJavaRDDLike; + +public abstract class ConvertingRddCallback implements RddCallback { + + private final CamelContext camelContext; + + private final Class[] payloadsTypes; + + public ConvertingRddCallback(CamelContext camelContext, Class... payloadsTypes) { + this.camelContext = camelContext; + this.payloadsTypes = payloadsTypes; + } + + @Override + public T onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + if (payloads.length != payloadsTypes.length) { + String message = format("Received %d payloads, but expected %d.", payloads.length, payloadsTypes.length); + throw new IllegalArgumentException(message); + } + for (int i = 0; i < payloads.length; i++) { + payloads[i] = camelContext.getTypeConverter().convertTo(payloadsTypes[i], payloads[i]); + } + return doOnRdd(rdd, payloads); + } + + public abstract T doOnRdd(AbstractJavaRDDLike rdd, Object... payloads); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/be05f73e/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java deleted file mode 100644 index 4a88e8e..0000000 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.spark; - -public enum SparkTransformation { - - FILTER, MAP - -} http://git-wip-us.apache.org/repos/asf/camel/blob/be05f73e/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java deleted file mode 100644 index cb97c81..0000000 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.spark; - -import org.apache.camel.CamelContext; -import org.apache.spark.api.java.AbstractJavaRDDLike; - -public abstract class TypedRddCallback implements RddCallback { - - private final CamelContext camelContext; - - private final Class[] payloadsTypes; - - public TypedRddCallback(CamelContext camelContext, Class[] payloadsTypes) { - this.camelContext = camelContext; - this.payloadsTypes = payloadsTypes; - } - - @Override - public T onRdd(AbstractJavaRDDLike rdd, Object... payloads) { - for (int i = 0; i < payloads.length; i++) { - payloads[i] = camelContext.getTypeConverter().convertTo(payloadsTypes[i], payloads[i]); - } - return doOnRdd(rdd, payloads); - } - - public abstract T doOnRdd(AbstractJavaRDDLike rdd, Object... payloads); - -} http://git-wip-us.apache.org/repos/asf/camel/blob/be05f73e/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java index bf6f706..4617569 100644 --- a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java +++ b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java @@ -124,7 +124,7 @@ public class SparkProducerTest extends CamelTestSupport { @Test public void shouldExecuteRddCallbackWithTypedPayloads() { - TypedRddCallback rddCallback = new TypedRddCallback(context, new Class[]{int.class, int.class}) { + ConvertingRddCallback rddCallback = new ConvertingRddCallback(context, int.class, int.class) { @Override public Long doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1];