Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B30619209 for ; Wed, 27 Apr 2016 05:37:42 +0000 (UTC) Received: (qmail 32702 invoked by uid 500); 27 Apr 2016 05:37:42 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 32657 invoked by uid 500); 27 Apr 2016 05:37:42 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 32556 invoked by uid 99); 27 Apr 2016 05:37:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 05:37:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 46866DFF70; Wed, 27 Apr 2016 05:37:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 27 Apr 2016 05:37:43 -0000 Message-Id: <74b1ea6749bd4f18b6679139e7d52458@git.apache.org> In-Reply-To: <76dfb53212a84727a44d052d89f363de@git.apache.org> References: <76dfb53212a84727a44d052d89f363de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] camel git commit: Renamed the producer name from Zookeeper to ZooKeeper Renamed the producer name from Zookeeper to ZooKeeper Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c076c3d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c076c3d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c076c3d Branch: refs/heads/master Commit: 9c076c3d573435a71bbbe560717930b7dc377a70 Parents: 3c3202e Author: Tatsuya Hoshino Authored: Wed Apr 27 08:04:41 2016 +0900 Committer: Claus Ibsen Committed: Wed Apr 27 07:37:31 2016 +0200 ---------------------------------------------------------------------- .../component/zookeeper/ZooKeeperEndpoint.java | 4 +- .../component/zookeeper/ZooKeeperProducer.java | 266 +++++++++++++++++++ .../component/zookeeper/ZookeeperProducer.java | 266 ------------------- .../zookeeper/ZooKeeperProducerTest.java | 167 ++++++++++++ .../zookeeper/ZookeeperProducerTest.java | 167 ------------ 5 files changed, 435 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java index e85876d..fa646f6 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java @@ -45,7 +45,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint { } public Producer createProducer() throws Exception { - return new ZookeeperProducer(this); + return new ZooKeeperProducer(this); } public Consumer createConsumer(Processor processor) throws Exception { @@ -167,7 +167,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint { public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) { getConfiguration().setSendEmptyMessageOnDelete(sendEmptyMessageOnDelete); } - + @Override protected void doStop() throws Exception { if (connectionManager != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java new file mode 100644 index 0000000..e5428af --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper; + +import static java.lang.String.format; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.zookeeper.operations.CreateOperation; +import org.apache.camel.component.zookeeper.operations.DeleteOperation; +import org.apache.camel.component.zookeeper.operations.GetChildrenOperation; +import org.apache.camel.component.zookeeper.operations.OperationResult; +import org.apache.camel.component.zookeeper.operations.SetDataOperation; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ExchangeHelper; + +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage; +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode; +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString; +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage; +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange; +import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage; + +/** + * ZooKeeperProducer attempts to set the content of nodes in the + * {@link ZooKeeper} cluster with the payloads of the of the exchanges it + * receives. + */ +@SuppressWarnings("rawtypes") +public class ZooKeeperProducer extends DefaultProducer { + public static final String ZK_OPERATION_WRITE = "WRITE"; + public static final String ZK_OPERATION_DELETE = "DELETE"; + + private final ZooKeeperConfiguration configuration; + private ZooKeeperConnectionManager zkm; + private ZooKeeper connection; + + public ZooKeeperProducer(ZooKeeperEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + this.zkm = endpoint.getConnectionManager(); + } + + public void process(Exchange exchange) throws Exception { + + ProductionContext context = new ProductionContext(connection, exchange); + + String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class); + boolean isDelete = ZK_OPERATION_DELETE.equals(operation); + + if (ExchangeHelper.isOutCapable(exchange)) { + if (isDelete) { + if (log.isDebugEnabled()) { + log.debug(format("Deleting znode '%s', waiting for confirmation", context.node)); + } + + OperationResult result = synchronouslyDelete(context); + if (configuration.isListChildren()) { + result = listChildren(context); + } + updateExchangeWithResult(context, result); + } else { + if (log.isDebugEnabled()) { + log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node)); + } + + OperationResult result = synchronouslySetData(context); + if (configuration.isListChildren()) { + result = listChildren(context); + } + updateExchangeWithResult(context, result); + } + } else { + if (isDelete) { + asynchronouslyDeleteNode(connection, context); + } else { + asynchronouslySetDataOnNode(connection, context); + } + } + + } + + @Override + protected void doStart() throws Exception { + connection = zkm.getConnection(); + if (log.isTraceEnabled()) { + log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath())); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (log.isTraceEnabled()) { + log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath())); + } + zkm.shutdown(); + } + + private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) { + if (log.isDebugEnabled()) { + log.debug(format("Deleting node '%s', not waiting for confirmation", context.node)); + } + connection.delete(context.node, context.version, new AsyncDeleteCallback(), context); + + } + + private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) { + if (log.isDebugEnabled()) { + log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node)); + } + connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context); + } + + private void updateExchangeWithResult(ProductionContext context, OperationResult result) { + ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders()); + if (result.isOk()) { + out.setBody(result.getResult()); + } else { + context.exchange.setException(result.getException()); + } + + context.exchange.setOut(out); + } + + private OperationResult listChildren(ProductionContext context) throws Exception { + return new GetChildrenOperation(context.connection, configuration.getPath()).get(); + } + + /** Simple container to avoid passing all these around as parameters */ + private class ProductionContext { + ZooKeeper connection; + Exchange exchange; + Message in; + byte[] payload; + int version; + String node; + + public ProductionContext(ZooKeeper connection, Exchange exchange) { + this.connection = connection; + this.exchange = exchange; + this.in = exchange.getIn(); + this.node = getNodeFromMessage(in, configuration.getPath()); + this.version = getVersionFromMessage(in); + this.payload = getPayloadFromExchange(exchange); + } + } + + private class AsyncSetDataCallback implements StatCallback { + + public void processResult(int rc, String node, Object ctx, Stat statistics) { + if (Code.NONODE.equals(Code.get(rc))) { + if (configuration.isCreate()) { + log.warn(format("Node '%s' did not exist, creating it...", node)); + ProductionContext context = (ProductionContext)ctx; + OperationResult result = null; + try { + result = createNode(context); + } catch (Exception e) { + log.error(format("Error trying to create node '%s'", node), e); + } + + if (result == null || !result.isOk()) { + log.error(format("Error creating node '%s'", node), result.getException()); + } + } + } else { + logStoreComplete(node, statistics); + } + } + } + + private class AsyncDeleteCallback implements VoidCallback { + @Override + public void processResult(int rc, String path, Object ctx) { + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace(format("Removed data node '%s'", path)); + } else { + log.debug(format("Removed data node '%s'", path)); + } + } + } + } + + private OperationResult createNode(ProductionContext ctx) throws Exception { + CreateOperation create = new CreateOperation(ctx.connection, ctx.node); + create.setPermissions(getAclListFromMessage(ctx.exchange.getIn())); + + CreateMode mode = null; + String modeString = configuration.getCreateMode(); + if (modeString != null) { + try { + mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL); + } catch (Exception e) { } + } else { + mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL); + } + create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode); + create.setData(ctx.payload); + return create.get(); + } + + /** + * Tries to set the data first and if a no node error is received then an + * attempt will be made to create it instead. + */ + private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception { + + SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload); + setData.setVersion(ctx.version); + + OperationResult result = setData.get(); + + if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) { + log.warn(format("Node '%s' did not exist, creating it.", ctx.node)); + result = createNode(ctx); + } + return result; + } + + private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception { + DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node); + setData.setVersion(ctx.version); + + OperationResult result = setData.get(); + + if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) { + log.warn(format("Node '%s' did not exist, creating it.", ctx.node)); + result = createNode(ctx); + } + return result; + } + + + private void logStoreComplete(String path, Stat statistics) { + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics)); + } else { + log.debug(format("Stored data to node '%s'", path)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java deleted file mode 100644 index 2280049..0000000 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.zookeeper; - -import static java.lang.String.format; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.component.zookeeper.operations.CreateOperation; -import org.apache.camel.component.zookeeper.operations.DeleteOperation; -import org.apache.camel.component.zookeeper.operations.GetChildrenOperation; -import org.apache.camel.component.zookeeper.operations.OperationResult; -import org.apache.camel.component.zookeeper.operations.SetDataOperation; -import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.ExchangeHelper; - -import org.apache.zookeeper.AsyncCallback.StatCallback; -import org.apache.zookeeper.AsyncCallback.VoidCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage; -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode; -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString; -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage; -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange; -import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage; - -/** - * ZookeeperProducer attempts to set the content of nodes in the - * {@link ZooKeeper} cluster with the payloads of the of the exchanges it - * receives. - */ -@SuppressWarnings("rawtypes") -public class ZookeeperProducer extends DefaultProducer { - public static final String ZK_OPERATION_WRITE = "WRITE"; - public static final String ZK_OPERATION_DELETE = "DELETE"; - - private final ZooKeeperConfiguration configuration; - private ZooKeeperConnectionManager zkm; - private ZooKeeper connection; - - public ZookeeperProducer(ZooKeeperEndpoint endpoint) { - super(endpoint); - this.configuration = endpoint.getConfiguration(); - this.zkm = endpoint.getConnectionManager(); - } - - public void process(Exchange exchange) throws Exception { - - ProductionContext context = new ProductionContext(connection, exchange); - - String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class); - boolean isDelete = ZK_OPERATION_DELETE.equals(operation); - - if (ExchangeHelper.isOutCapable(exchange)) { - if (isDelete) { - if (log.isDebugEnabled()) { - log.debug(format("Deleting znode '%s', waiting for confirmation", context.node)); - } - - OperationResult result = synchronouslyDelete(context); - if (configuration.isListChildren()) { - result = listChildren(context); - } - updateExchangeWithResult(context, result); - } else { - if (log.isDebugEnabled()) { - log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node)); - } - - OperationResult result = synchronouslySetData(context); - if (configuration.isListChildren()) { - result = listChildren(context); - } - updateExchangeWithResult(context, result); - } - } else { - if (isDelete) { - asynchronouslyDeleteNode(connection, context); - } else { - asynchronouslySetDataOnNode(connection, context); - } - } - - } - - @Override - protected void doStart() throws Exception { - connection = zkm.getConnection(); - if (log.isTraceEnabled()) { - log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath())); - } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - if (log.isTraceEnabled()) { - log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath())); - } - zkm.shutdown(); - } - - private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) { - if (log.isDebugEnabled()) { - log.debug(format("Deleting node '%s', not waiting for confirmation", context.node)); - } - connection.delete(context.node, context.version, new AsyncDeleteCallback(), context); - - } - - private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) { - if (log.isDebugEnabled()) { - log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node)); - } - connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context); - } - - private void updateExchangeWithResult(ProductionContext context, OperationResult result) { - ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders()); - if (result.isOk()) { - out.setBody(result.getResult()); - } else { - context.exchange.setException(result.getException()); - } - - context.exchange.setOut(out); - } - - private OperationResult listChildren(ProductionContext context) throws Exception { - return new GetChildrenOperation(context.connection, configuration.getPath()).get(); - } - - /** Simple container to avoid passing all these around as parameters */ - private class ProductionContext { - ZooKeeper connection; - Exchange exchange; - Message in; - byte[] payload; - int version; - String node; - - public ProductionContext(ZooKeeper connection, Exchange exchange) { - this.connection = connection; - this.exchange = exchange; - this.in = exchange.getIn(); - this.node = getNodeFromMessage(in, configuration.getPath()); - this.version = getVersionFromMessage(in); - this.payload = getPayloadFromExchange(exchange); - } - } - - private class AsyncSetDataCallback implements StatCallback { - - public void processResult(int rc, String node, Object ctx, Stat statistics) { - if (Code.NONODE.equals(Code.get(rc))) { - if (configuration.isCreate()) { - log.warn(format("Node '%s' did not exist, creating it...", node)); - ProductionContext context = (ProductionContext)ctx; - OperationResult result = null; - try { - result = createNode(context); - } catch (Exception e) { - log.error(format("Error trying to create node '%s'", node), e); - } - - if (result == null || !result.isOk()) { - log.error(format("Error creating node '%s'", node), result.getException()); - } - } - } else { - logStoreComplete(node, statistics); - } - } - } - - private class AsyncDeleteCallback implements VoidCallback { - @Override - public void processResult(int rc, String path, Object ctx) { - if (log.isDebugEnabled()) { - if (log.isTraceEnabled()) { - log.trace(format("Removed data node '%s'", path)); - } else { - log.debug(format("Removed data node '%s'", path)); - } - } - } - } - - private OperationResult createNode(ProductionContext ctx) throws Exception { - CreateOperation create = new CreateOperation(ctx.connection, ctx.node); - create.setPermissions(getAclListFromMessage(ctx.exchange.getIn())); - - CreateMode mode = null; - String modeString = configuration.getCreateMode(); - if (modeString != null) { - try { - mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL); - } catch (Exception e) { } - } else { - mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL); - } - create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode); - create.setData(ctx.payload); - return create.get(); - } - - /** - * Tries to set the data first and if a no node error is received then an - * attempt will be made to create it instead. - */ - private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception { - - SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload); - setData.setVersion(ctx.version); - - OperationResult result = setData.get(); - - if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) { - log.warn(format("Node '%s' did not exist, creating it.", ctx.node)); - result = createNode(ctx); - } - return result; - } - - private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception { - DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node); - setData.setVersion(ctx.version); - - OperationResult result = setData.get(); - - if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) { - log.warn(format("Node '%s' did not exist, creating it.", ctx.node)); - result = createNode(ctx); - } - return result; - } - - - private void logStoreComplete(String path, Stat statistics) { - if (log.isDebugEnabled()) { - if (log.isTraceEnabled()) { - log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics)); - } else { - log.debug(format("Stored data to node '%s'", path)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java new file mode 100644 index 0000000..ced212f --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.zookeeper.operations.GetChildrenOperation; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; + +import org.junit.Test; + +import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE; +import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE; +import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION; + +public class ZooKeeperProducerTest extends ZooKeeperTestSupport { + + private String zookeeperUri; + private String testPayload = "TestPayload"; + + @Override + protected RouteBuilder[] createRouteBuilders() throws Exception { + return new RouteBuilder[] {new RouteBuilder() { + public void configure() throws Exception { + zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true"; + from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out"); + from(zookeeperUri).to("mock:consumed-from-node"); + } + }, new RouteBuilder() { + public void configure() throws Exception { + from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist"); + } + }, new RouteBuilder() { + public void configure() throws Exception { + from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true"); + from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node"); + } + }, new RouteBuilder() { + public void configure() throws Exception { + from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode"); + } + }, new RouteBuilder() { + public void configure() throws Exception { + from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete"); + } + }}; + } + + @Test + public void testRoundtripOfDataToAndFromZnode() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); + MockEndpoint pipeline = getMockEndpoint("mock:producer-out"); + mock.expectedMessageCount(1); + pipeline.expectedMessageCount(1); + + Exchange e = createExchangeWithBody(testPayload); + e.setPattern(ExchangePattern.InOut); + template.send("direct:roundtrip", e); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); + mock.expectedMessageCount(1); + + Exchange e = createExchangeWithBody(testPayload); + template.send("direct:roundtrip", e); + + assertMockEndpointsSatisfied(); + } + + @Test + public void setUsingCreateModeFromHeader() throws Exception { + client.createPersistent("/modes-test", "parent for modes"); + for (CreateMode mode : CreateMode.values()) { + Exchange exchange = createExchangeWithBody(testPayload); + exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode); + exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode); + exchange.setPattern(ExchangePattern.InOut); + template.send("direct:node-from-header", exchange); + } + GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test"); + assertEquals(CreateMode.values().length, listing.get().getResult().size()); + } + + @Test + public void createWithOtherCreateMode() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:create-mode"); + mock.expectedMessageCount(1); + + Exchange e = createExchangeWithBody(testPayload); + e.setPattern(ExchangePattern.InOut); + + template.send("direct:create-mode", e); + + assertMockEndpointsSatisfied(); + + Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class); + assertEquals(s.getEphemeralOwner(), 0); + } + + @Test + public void deleteNode() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:delete"); + mock.expectedMessageCount(1); + + client.createPersistent("/to-be-deleted", "to be deleted"); + Exchange e = createExchangeWithBody(null); + e.setPattern(ExchangePattern.InOut); + e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE"); + template.send("direct:delete", e); + + assertMockEndpointsSatisfied(); + + assertNull(client.getConnection().exists("/to-be-deleted", false)); + } + + @Test + public void setAndGetListing() throws Exception { + client.createPersistent("/set-listing", "parent for set and list test"); + + Exchange exchange = createExchangeWithBody(testPayload); + exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn"); + exchange.setPattern(ExchangePattern.InOut); + template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange); + List children = exchange.getOut().getMandatoryBody(List.class); + assertEquals(1, children.size()); + assertEquals("firstborn", children.get(0)); + } + + @Test + public void testZookeeperMessage() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); + mock.expectedMessageCount(1); + + Exchange exchange = createExchangeWithBody(testPayload); + template.send("direct:roundtrip", exchange); + + assertMockEndpointsSatisfied(); + + Message received = mock.getReceivedExchanges().get(0).getIn(); + assertEquals("/node", ZooKeeperMessage.getPath(received)); + assertNotNull(ZooKeeperMessage.getStatistics(received)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java deleted file mode 100644 index 3dcb493..0000000 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.zookeeper; - -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.zookeeper.operations.GetChildrenOperation; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; - -import org.junit.Test; - -import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE; -import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE; -import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION; - -public class ZookeeperProducerTest extends ZooKeeperTestSupport { - - private String zookeeperUri; - private String testPayload = "TestPayload"; - - @Override - protected RouteBuilder[] createRouteBuilders() throws Exception { - return new RouteBuilder[] {new RouteBuilder() { - public void configure() throws Exception { - zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true"; - from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out"); - from(zookeeperUri).to("mock:consumed-from-node"); - } - }, new RouteBuilder() { - public void configure() throws Exception { - from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist"); - } - }, new RouteBuilder() { - public void configure() throws Exception { - from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true"); - from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node"); - } - }, new RouteBuilder() { - public void configure() throws Exception { - from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode"); - } - }, new RouteBuilder() { - public void configure() throws Exception { - from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete"); - } - }}; - } - - @Test - public void testRoundtripOfDataToAndFromZnode() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); - MockEndpoint pipeline = getMockEndpoint("mock:producer-out"); - mock.expectedMessageCount(1); - pipeline.expectedMessageCount(1); - - Exchange e = createExchangeWithBody(testPayload); - e.setPattern(ExchangePattern.InOut); - template.send("direct:roundtrip", e); - - assertMockEndpointsSatisfied(); - } - - @Test - public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); - mock.expectedMessageCount(1); - - Exchange e = createExchangeWithBody(testPayload); - template.send("direct:roundtrip", e); - - assertMockEndpointsSatisfied(); - } - - @Test - public void setUsingCreateModeFromHeader() throws Exception { - client.createPersistent("/modes-test", "parent for modes"); - for (CreateMode mode : CreateMode.values()) { - Exchange exchange = createExchangeWithBody(testPayload); - exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode); - exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode); - exchange.setPattern(ExchangePattern.InOut); - template.send("direct:node-from-header", exchange); - } - GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test"); - assertEquals(CreateMode.values().length, listing.get().getResult().size()); - } - - @Test - public void createWithOtherCreateMode() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:create-mode"); - mock.expectedMessageCount(1); - - Exchange e = createExchangeWithBody(testPayload); - e.setPattern(ExchangePattern.InOut); - - template.send("direct:create-mode", e); - - assertMockEndpointsSatisfied(); - - Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class); - assertEquals(s.getEphemeralOwner(), 0); - } - - @Test - public void deleteNode() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:delete"); - mock.expectedMessageCount(1); - - client.createPersistent("/to-be-deleted", "to be deleted"); - Exchange e = createExchangeWithBody(null); - e.setPattern(ExchangePattern.InOut); - e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE"); - template.send("direct:delete", e); - - assertMockEndpointsSatisfied(); - - assertNull(client.getConnection().exists("/to-be-deleted", false)); - } - - @Test - public void setAndGetListing() throws Exception { - client.createPersistent("/set-listing", "parent for set and list test"); - - Exchange exchange = createExchangeWithBody(testPayload); - exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn"); - exchange.setPattern(ExchangePattern.InOut); - template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange); - List children = exchange.getOut().getMandatoryBody(List.class); - assertEquals(1, children.size()); - assertEquals("firstborn", children.get(0)); - } - - @Test - public void testZookeeperMessage() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:consumed-from-node"); - mock.expectedMessageCount(1); - - Exchange exchange = createExchangeWithBody(testPayload); - template.send("direct:roundtrip", exchange); - - assertMockEndpointsSatisfied(); - - Message received = mock.getReceivedExchanges().get(0).getIn(); - assertEquals("/node", ZooKeeperMessage.getPath(received)); - assertNotNull(ZooKeeperMessage.getStatistics(received)); - } -}