camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r1417392 - in /camel/trunk/components/camel-zookeeper/src: main/java/org/apache/camel/component/zookeeper/ test/java/org/apache/camel/component/zookeeper/
Date Wed, 05 Dec 2012 12:48:27 GMT
Author: ningjiang
Date: Wed Dec  5 12:48:27 2012
New Revision: 1417392

URL: http://svn.apache.org/viewvc?rev=1417392&view=rev
Log:
CAMEL-5843 camel-zookeeper - Add functionality to delete a node with thanks to Benjamin

Modified:
    camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
    camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
    camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java

Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java?rev=1417392&r1=1417391&r2=1417392&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
(original)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperMessage.java
Wed Dec  5 12:48:27 2012
@@ -46,6 +46,8 @@ public class ZooKeeperMessage extends De
 
     public static final String ZOOKEEPER_EVENT_TYPE = "CamelZookeeperEventType";
 
+    public static final String ZOOKEEPER_OPERATION = "CamelZookeeperOperation";
+
     public ZooKeeperMessage(String node, Stat statistics, WatchedEvent watchedEvent) {
         this(node, statistics, Collections.<String, Object>emptyMap(), watchedEvent);
     }

Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java?rev=1417392&r1=1417391&r2=1417392&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
(original)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
Wed Dec  5 12:48:27 2012
@@ -21,6 +21,7 @@ import static java.lang.String.format;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.zookeeper.operations.CreateOperation;
+import org.apache.camel.component.zookeeper.operations.DeleteOperation;
 import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
 import org.apache.camel.component.zookeeper.operations.OperationResult;
 import org.apache.camel.component.zookeeper.operations.SetDataOperation;
@@ -28,6 +29,7 @@ import org.apache.camel.impl.DefaultProd
 import org.apache.camel.util.ExchangeHelper;
 
 import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
@@ -47,6 +49,8 @@ import static org.apache.camel.component
  */
 @SuppressWarnings("rawtypes")
 public class ZookeeperProducer extends DefaultProducer {
+    public static final String ZK_OPERATION_WRITE  = "WRITE";
+    public static final String ZK_OPERATION_DELETE = "DELETE";
 
     private ZooKeeperConfiguration configuration;
 
@@ -63,20 +67,46 @@ public class ZookeeperProducer extends D
         ZooKeeper connection = zkm.getConnection();
         ProductionContext context = new ProductionContext(connection, exchange);
 
+        String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION,
String.class);
+        boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
+        
         if (ExchangeHelper.isOutCapable(exchange)) {
-            if (log.isDebugEnabled()) {
-                log.debug(format("Storing data to znode '%s', waiting for confirmation",
context.node));
-            }
+            if (isDelete) {
+                if (log.isDebugEnabled()) {
+                    log.debug(format("Deleting znode '%s', waiting for confirmation", context.node));
+                }
 
-            OperationResult result = synchronouslySetData(context);
-            if (configuration.listChildren()) {
-                result = listChildren(context);
-            }
+                OperationResult result = synchronouslyDelete(context);
+                if (configuration.listChildren()) {
+                    result = listChildren(context);
+                }
+                updateExchangeWithResult(context, result);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(format("Storing data to znode '%s', waiting for confirmation",
context.node));
+                }
 
-            updateExchangeWithResult(context, result);
+                OperationResult result = synchronouslySetData(context);
+                if (configuration.listChildren()) {
+                    result = listChildren(context);
+                }
+                updateExchangeWithResult(context, result);
+            }
         } else {
-            asynchronouslySetDataOnNode(connection, context);
+            if (isDelete) {
+                asynchronouslyDeleteNode(connection, context);
+            } else {
+                asynchronouslySetDataOnNode(connection, context);
+            }
+        }
+    }
+
+    private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context)
{
+        if (log.isDebugEnabled()) {
+            log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
         }
+        connection.delete(context.node, context.version, new AsyncDeleteCallback(), context);
+
     }
 
     private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context)
{
@@ -144,6 +174,19 @@ public class ZookeeperProducer extends D
         }
     }
 
+    private class AsyncDeleteCallback implements VoidCallback {
+        @Override
+        public void processResult(int rc, String path, Object ctx) {
+            if (log.isDebugEnabled()) {
+                if (log.isTraceEnabled()) {
+                    log.trace(format("Removed data node '%s'", path));
+                } else {
+                    log.debug(format("Removed data node '%s'", path));
+                }
+            }
+        }
+    }
+    
     private OperationResult<String> createNode(ProductionContext ctx) throws Exception
{
         CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
         create.setPermissions(getAclListFromMessage(ctx.exchange.getIn()));
@@ -180,6 +223,20 @@ public class ZookeeperProducer extends D
         return result;
     }
 
+    private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
+        DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
+        setData.setVersion(ctx.version);
+
+        OperationResult result = setData.get();
+
+        if (!result.isOk() && configuration.shouldCreate() && result.failedDueTo(Code.NONODE))
{
+            log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
+            result = createNode(ctx);
+        }
+        return result;
+    }
+
+    
     private void logStoreComplete(String path, Stat statistics) {
         if (log.isDebugEnabled()) {
             if (log.isTraceEnabled()) {

Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java?rev=1417392&r1=1417391&r2=1417392&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
(original)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
Wed Dec  5 12:48:27 2012
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
 import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION;;
 
 public class ZookeeperProducerTest extends ZooKeeperTestSupport {
 
@@ -60,6 +61,10 @@ public class ZookeeperProducerTest exten
             public void configure() throws Exception {
                 from("direct:create-mode").to("zookeeper://localhost:39913/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
             }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete");
+            }
         }};
     }
 
@@ -133,6 +138,20 @@ public class ZookeeperProducerTest exten
         assertEquals(s.getEphemeralOwner(), 0);
     }
 
+    public void deleteNode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:delete");
+        mock.expectedMessageCount(1);
+
+        client.createPersistent("/to-be-deleted", "to be deleted");
+        Exchange e = createExchangeWithBody(null);
+        e.setPattern(ExchangePattern.InOut);
+        e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE");
+        template.send("direct:delete", e);
+
+        mock.await(5, TimeUnit.SECONDS);
+        assertNull(client.getConnection().exists("/to-be-deleted", false));
+    }
+
     @Test
     public void setAndGetListing() throws Exception {
         client.createPersistent("/set-listing", "parent for set and list test");



Mime
View raw message