camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bibr...@apache.org
Subject svn commit: r1330053 - in /camel/trunk/components/camel-jcr/src: main/java/org/apache/camel/component/jcr/ test/java/org/apache/camel/component/jcr/ test/resources/
Date Tue, 24 Apr 2012 22:56:11 GMT
Author: bibryam
Date: Tue Apr 24 22:56:11 2012
New Revision: 1330053

URL: http://svn.apache.org/viewvc?rev=1330053&view=rev
Log:
CAMEL-5155: Support JCR Component as Consumer. Thanks to Woonsan Ko for the contribution

Added:
    camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
    camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
    camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
    camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
    camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
    camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
Removed:
    camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTest.java
Modified:
    camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
    camel/trunk/components/camel-jcr/src/test/resources/log4j.properties

Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
+
+/**
+ * A JCR {@link EventListener} which can be used to delegate processing to a
+ * Camel endpoint.
+ *
+ * @version $Id$
+ */
+public class EndpointEventListener implements EventListener {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(EndpointEventListener.class);
+
+    private final JcrEndpoint endpoint;
+    private final Processor processor;
+
+    public EndpointEventListener(JcrEndpoint endpoint, Processor processor) {
+        this.endpoint = endpoint;
+        this.processor = processor;
+    }
+
+    public void onEvent(EventIterator events) {
+        LOG.trace("onEvent START");
+        LOG.debug("{} consumer received JCR events: {}", endpoint, events);
+        RuntimeCamelException rce = null;
+
+        try {
+            final Exchange exchange = createExchange(events);
+
+            try {
+                LOG.debug("Processor, {}, is processing exchange, {}", processor, exchange);
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            rce = exchange.getException(RuntimeCamelException.class);
+        } catch (Exception e) {
+            rce = wrapRuntimeCamelException(e);
+        }
+
+        if (rce != null) {
+            LOG.trace("onEvent END throwing exception: {}", rce.toString());
+            throw rce;
+        }
+
+        LOG.trace("onEvent END");
+    }
+
+    private Exchange createExchange(EventIterator events) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.setIn(new JcrMessage(events));
+        return exchange;
+    }
+}

Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListener;
+
+import org.apache.camel.Processor;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Consumer} to consume JCR events.
+ *
+ * @version $Id$
+ */
+public class JcrConsumer extends DefaultConsumer implements SuspendableService {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(JcrConsumer.class);
+
+    private Session session;
+    private EventListener eventListener;
+    private ScheduledFuture<?> sessionListenerCheckerScheduledFuture;
+
+    public JcrConsumer(JcrEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        scheduleSessionListenerChecker();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        cancelSessionListenerChecker();
+        unregisterListenerAndLogoutSession();
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        super.doSuspend();
+        cancelSessionListenerChecker();
+        unregisterListenerAndLogoutSession();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        super.doResume();
+        scheduleSessionListenerChecker();
+    }
+
+    protected JcrEndpoint getJcrEndpoint() {
+        JcrEndpoint endpoint = (JcrEndpoint) getEndpoint();
+        return endpoint;
+    }
+
+    private synchronized void createSessionAndRegisterListener() throws RepositoryException
{
+        LOG.trace("createSessionAndRegisterListener START");
+
+        session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
+
+        int eventTypes = getJcrEndpoint().getEventTypes();
+        String absPath = getJcrEndpoint().getBase();
+
+        if (absPath == null) {
+            absPath = "/";
+        } else if (!absPath.startsWith("/")) {
+            absPath = "/" + absPath;
+        }
+
+        boolean isDeep = getJcrEndpoint().isDeep();
+        String[] uuid = null;
+        String uuids = getJcrEndpoint().getUuids();
+
+        if (uuids != null) {
+            uuids = uuids.trim();
+
+            if (!"".equals(uuids)) {
+                uuid = uuids.split(",");
+            }
+        }
+
+        String[] nodeTypeName = null;
+        String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
+
+        if (nodeTypeNames != null) {
+            nodeTypeNames = nodeTypeNames.trim();
+
+            if (!"".equals(nodeTypeNames)) {
+                nodeTypeName = nodeTypeNames.split(",");
+            }
+        }
+
+        boolean noLocal = getJcrEndpoint().isNoLocal();
+
+        eventListener = new EndpointEventListener(getJcrEndpoint(), getProcessor());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes=" + eventTypes +
", isDeep=" + isDeep
+                    + ", uuid=" + uuid + ", nodeTypeName=" + nodeTypeName + ", noLocal="
+ noLocal, eventListener,
+                    absPath);
+        }
+
+        session.getWorkspace().getObservationManager()
+                .addEventListener(eventListener, eventTypes, absPath, isDeep, uuid, nodeTypeName,
noLocal);
+
+        LOG.trace("createSessionAndRegisterListener END");
+    }
+
+    private synchronized void unregisterListenerAndLogoutSession() throws RepositoryException
{
+        LOG.trace("unregisterListenerAndLogoutSession START");
+
+        if (session != null) {
+            try {
+                if (!session.isLive()) {
+                    LOG.info("Session was is no more live.");
+                } else {
+                    if (eventListener != null) {
+                        session.getWorkspace().getObservationManager().removeEventListener(eventListener);
+                        eventListener = null;
+                    }
+
+                    session.logout();
+                }
+            } finally {
+                eventListener = null;
+                session = null;
+            }
+        }
+
+        LOG.trace("unregisterListenerAndLogoutSession END");
+    }
+
+    private void cancelSessionListenerChecker() {
+        if (sessionListenerCheckerScheduledFuture != null) {
+            sessionListenerCheckerScheduledFuture.cancel(true);
+        }
+    }
+
+    private void scheduleSessionListenerChecker() {
+        String name = "JcrConsumerSessionChecker[" + getJcrEndpoint().getEndpointConfiguredDestinationName()
+ "]";
+        ScheduledExecutorService executor = getJcrEndpoint().getCamelContext().getExecutorServiceManager()
+                .newSingleThreadScheduledExecutor(this, name);
+        JcrConsumerSessionListenerChecker sessionListenerChecker = new JcrConsumerSessionListenerChecker();
+        long sessionLiveCheckIntervalOnStart = JcrConsumer.this.getJcrEndpoint().getSessionLiveCheckIntervalOnStart();
+        long sessionLiveCheckInterval = JcrConsumer.this.getJcrEndpoint().getSessionLiveCheckInterval();
+        sessionListenerCheckerScheduledFuture = executor.scheduleWithFixedDelay(sessionListenerChecker,
+                sessionLiveCheckIntervalOnStart, sessionLiveCheckInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private class JcrConsumerSessionListenerChecker implements Runnable {
+
+        public void run() {
+            LOG.debug("JcrConsumerSessionListenerChecker starts.");
+
+            boolean isSessionLive = false;
+
+            if (JcrConsumer.this.session != null) {
+                try {
+                    isSessionLive = JcrConsumer.this.session.isLive();
+                } catch (Exception e) {
+                    LOG.debug("Exception while checking jcr session", e);
+                }
+            }
+
+            if (!isSessionLive) {
+                try {
+                    createSessionAndRegisterListener();
+                } catch (RepositoryException e) {
+                    LOG.error("Failed to create session and register listener", e);
+                }
+            }
+
+            LOG.debug("JcrConsumerSessionListenerChecker stops.");
+        }
+    }
+
+}

Modified: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java?rev=1330053&r1=1330052&r2=1330053&view=diff
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
(original)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
Tue Apr 24 22:56:11 2012
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * A JCR endpoint
@@ -38,6 +39,15 @@ public class JcrEndpoint extends Default
     private Repository repository;
     private String base;
 
+    private int eventTypes;
+    private boolean deep;
+    private String uuids;
+    private String nodeTypeNames;
+    private boolean noLocal;
+
+    private long sessionLiveCheckIntervalOnStart = 3000L;
+    private long sessionLiveCheckInterval = 60000L;
+
     protected JcrEndpoint(String endpointUri, JcrComponent component) {
         super(endpointUri, component);
         try {
@@ -47,8 +57,7 @@ public class JcrEndpoint extends Default
                 if (creds != null) {
                     String username = creds[0];
                     String password = creds.length > 1 ? creds[1] : null;
-                    this.credentials = new SimpleCredentials(username, password
-                            .toCharArray());
+                    this.credentials = new SimpleCredentials(username, password.toCharArray());
                 }
             }
             this.repository = component.getCamelContext().getRegistry().lookup(uri.getHost(),
Repository.class);
@@ -66,7 +75,7 @@ public class JcrEndpoint extends Default
      * @throws RuntimeCamelException
      */
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new RuntimeCamelException("No consumer endpoint support for JCR available");
+        return new JcrConsumer(this, processor);
     }
 
     public Producer createProducer() throws Exception {
@@ -104,4 +113,133 @@ public class JcrEndpoint extends Default
         return base;
     }
 
+    /**
+     * <code>eventTypes</code> (a combination of one or more event types encoded
+     * as a bit mask value such as javax.jcr.observation.Event.NODE_ADDED, javax.jcr.observation.Event.NODE_REMOVED,
etc.).
+     * 
+     * @return eventTypes
+     * @see {@link javax.jcr.observation.Event}
+     * @see {@link javax.jcr.observation.ObservationManager#addEventListener(javax.jcr.observation.EventListener,
int, String, boolean, String[], String[], boolean)}
+     */
+    public int getEventTypes() {
+        return eventTypes;
+    }
+
+    public void setEventTypes(int eventTypes) {
+        this.eventTypes = eventTypes;
+    }
+
+    /**
+     * When <code>isDeep</code> is true, events whose associated parent node
is at
+     * <code>absPath</code> or within its subgraph are received.
+     * @return deep
+     */
+    public boolean isDeep() {
+        return deep;
+    }
+
+    public void setDeep(boolean deep) {
+        this.deep = deep;
+    }
+
+    /**
+     * When a comma separated uuid list string is set, only events whose associated parent
node has one of
+     * the identifiers in the comma separated uuid list will be received.
+     * @return comma separated uuid list string
+     */
+    public String getUuids() {
+        return uuids;
+    }
+
+    public void setUuids(String uuids) {
+        this.uuids = uuids;
+    }
+
+    /**
+     * When a comma separated <code>nodeTypeName</code> list string is set, only
events whose associated parent node has
+     * one of the node types (or a subtype of one of the node types) in this
+     * list will be received.
+     * @return
+     */
+    public String getNodeTypeNames() {
+        return nodeTypeNames;
+    }
+
+    public void setNodeTypeNames(String nodeTypeNames) {
+        this.nodeTypeNames = nodeTypeNames;
+    }
+
+    /**
+     * If <code>noLocal</code> is <code>true</code>, then events
+     * generated by the session through which the listener was registered are
+     * ignored. Otherwise, they are not ignored.
+     * @return noLocal
+     */
+    public boolean isNoLocal() {
+        return noLocal;
+    }
+
+    public void setNoLocal(boolean noLocal) {
+        this.noLocal = noLocal;
+    }
+
+    /**
+     * Interval in milliseconds to wait before the first session live checking.
+     * The default value is 3000 ms.
+     * 
+     * @return sessionLiveCheckIntervalOnStart
+     */
+    public long getSessionLiveCheckIntervalOnStart() {
+        return sessionLiveCheckIntervalOnStart;
+    }
+
+    public void setSessionLiveCheckIntervalOnStart(long sessionLiveCheckIntervalOnStart)
{
+        if (sessionLiveCheckIntervalOnStart <= 0) {
+            throw new IllegalArgumentException("sessionLiveCheckIntervalOnStart must be positive
value");
+        }
+
+        this.sessionLiveCheckIntervalOnStart = sessionLiveCheckIntervalOnStart;
+    }
+
+    /**
+     * Interval in milliseconds to wait before each session live checking
+     * The default value is 60000 ms.
+     * 
+     * @return
+     */
+    public long getSessionLiveCheckInterval() {
+        return sessionLiveCheckInterval;
+    }
+
+    public void setSessionLiveCheckInterval(long sessionLiveCheckInterval) {
+        if (sessionLiveCheckInterval <= 0) {
+            throw new IllegalArgumentException("sessionLiveCheckInterval must be positive
value");
+        }
+
+        this.sessionLiveCheckInterval = sessionLiveCheckInterval;
+    }
+
+    /**
+     * Gets the destination name which was configured from the endpoint uri.
+     *
+     * @return the destination name resolved from the endpoint uri
+     */
+    public String getEndpointConfiguredDestinationName() {
+        String remainder = ObjectHelper.after(getEndpointKey(), "//");
+
+        if (remainder != null && remainder.contains("@")) {
+            remainder = remainder.substring(remainder.indexOf('@'));
+        }
+
+        if (remainder != null && remainder.contains("?")) {
+            // remove parameters
+            remainder = ObjectHelper.before(remainder, "?");
+        }
+
+        if (ObjectHelper.isEmpty(remainder)) {
+            return remainder;
+        }
+
+        return remainder;
+    }
 }

Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Represents a {@link org.apache.camel.Message} for working with JCR
+ *
+ * @version $Id$
+ */
+public class JcrMessage extends DefaultMessage {
+
+    private EventIterator eventIterator;
+    private List<Event> eventList;
+
+    public JcrMessage(EventIterator eventIterator) {
+        this.eventIterator = eventIterator;
+    }
+
+    @Override
+    public String toString() {
+        if (eventIterator != null) {
+            return "JcrMessage[eventIterator: " + eventIterator + ", eventList: " + eventList
+ "]";
+        }
+
+        return "JcrMessage@" + ObjectHelper.getIdentityHashCode(this);
+    }
+
+    @Override
+    public void copyFrom(org.apache.camel.Message that) {
+        // must initialize headers before we set the JmsMessage to avoid Camel
+        // populating it before we do the copy
+        getHeaders().clear();
+
+        if (that instanceof JcrMessage) {
+            JcrMessage thatMessage = (JcrMessage) that;
+            this.eventIterator = thatMessage.eventIterator;
+            this.eventList = thatMessage.eventList;
+        }
+
+        // copy body and fault flag
+        setBody(that.getBody());
+        setFault(that.isFault());
+
+        // we have already cleared the headers
+        if (that.hasHeaders()) {
+            getHeaders().putAll(that.getHeaders());
+        }
+
+        getAttachments().clear();
+
+        if (that.hasAttachments()) {
+            getAttachments().putAll(that.getAttachments());
+        }
+    }
+
+    public EventIterator getEventIterator() {
+        return eventIterator;
+    }
+
+    @Override
+    protected Object createBody() {
+        if (eventList == null) {
+            eventList = new LinkedList<Event>();
+
+            if (eventIterator != null) {
+                while (eventIterator.hasNext()) {
+                    eventList.add(eventIterator.nextEvent());
+                }
+            }
+        }
+
+        return eventList;
+    }
+}

Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.List;
+import javax.jcr.Node;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JcrConsumerTest
+ *
+ * @version $Id$
+ */
+public class JcrConsumerTest extends JcrRouteTestSupport {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(JcrConsumerTest.class);
+
+    private String absPath = "/home/test";
+    private int eventTypes = Event.NODE_ADDED;
+    private boolean deep = true;
+    private String uuids;
+    private String nodeTypeNames;
+    private boolean noLocal;
+
+    @Test
+    public void testJcrConsumer() throws Exception {
+        // start consumer thread first
+        JcrConsumerThread consumerThread = new JcrConsumerThread();
+        consumerThread.start();
+        // wait until the consumer thread has tried to receive event at least once
+        while (consumerThread.getReceiveTrialTimes() < 1) {
+            Thread.sleep(10L);
+        }
+
+        // now create a node under the specified event node path
+
+        Session session = getRepository().login(new SimpleCredentials("user", "pass".toCharArray()));
+
+        try {
+            Node folderNode = session.getRootNode();
+
+            for (String folderNodeName : absPath.split("\\/")) {
+                if (!"".equals(folderNodeName)) {
+                    if (folderNode.hasNode(folderNodeName)) {
+                        folderNode.getNode(folderNodeName).remove();
+                    }
+
+                    folderNode = folderNode.addNode(folderNodeName, "nt:unstructured");
+                }
+            }
+
+            folderNode.addNode("node", "nt:unstructured");
+            session.save();
+        } finally {
+            if (session != null && session.isLive()) {
+                session.logout();
+            }
+        }
+
+        // wait until the consumer thread captures an event
+        consumerThread.join();
+
+        Exchange exchange = consumerThread.getExchange();
+        assertNotNull(exchange);
+
+        Message message = exchange.getIn();
+        assertNotNull(message);
+        assertTrue(message instanceof JcrMessage);
+        EventIterator eventIterator = ((JcrMessage)message).getEventIterator();
+        assertNotNull(eventIterator);
+        assertEquals(1, eventIterator.getSize());
+
+        Object body = message.getBody();
+        assertTrue(body instanceof List);
+        @SuppressWarnings("unchecked")
+        List<Event> eventList = (List<Event>)body;
+        assertEquals(1, eventList.size());
+        Event event = eventList.get(0);
+        assertEquals(Event.NODE_ADDED, event.getType());
+        assertNotNull(event.getPath());
+        assertTrue(event.getPath().startsWith(absPath));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                String uri = "jcr://user:pass@repository";
+                uri += absPath;
+                uri += "?eventTypes=" + eventTypes;
+                uri += "&deep=" + deep;
+                uri += "&noLocal=" + noLocal;
+                from(uri).to("direct:a");
+            }
+        };
+    }
+
+    private class JcrConsumerThread extends Thread {
+
+        private Exchange exchange;
+        private int receiveTrialTimes;
+
+        public void run() {
+            while (exchange == null) {
+                exchange = consumer.receive("direct:a", 10L);
+                ++receiveTrialTimes;
+
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+
+            LOG.debug("JcrConsumerThread receive exchange, {} after {} trials", exchange,
receiveTrialTimes);
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        public int getReceiveTrialTimes() {
+            return receiveTrialTimes;
+        }
+    }
+}

Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JcrProducerTest extends JcrRouteTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/repository");
+        super.setUp();
+    }
+
+    @Test
+    public void testJcrProducer() throws Exception {
+        Exchange exchange = createExchangeWithBody("<hello>world!</hello>");
+        Exchange out = template.send("direct:a", exchange);
+        assertNotNull(out);
+        String uuid = out.getOut().getBody(String.class);
+        Session session = getRepository().login(new SimpleCredentials("user", "pass".toCharArray()));
+        try {
+            Node node = session.getNodeByIdentifier(uuid);
+            assertNotNull(node);
+            assertEquals("/home/test/node", node.getPath());
+            assertEquals("<hello>world!</hello>", node.getProperty("my.contents.property").getString());
+        } finally {
+            if (session != null && session.isLive()) {
+                session.logout();
+            }
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: jcr
+                from("direct:a").setProperty(JcrConstants.JCR_NODE_NAME, constant("node"))
+                        .setProperty("my.contents.property", body())
+                        .to("jcr://user:pass@repository/home/test");
+                // END SNIPPET: jcr
+            }
+        };
+    }
+
+}

Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.Repository;
+import javax.naming.Context;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.jackrabbit.core.TransientRepository;
+import org.junit.Before;
+
+/**
+ * JcrRouteTestSupport
+ * 
+ * @version $Id$
+ */
+public abstract class JcrRouteTestSupport extends CamelTestSupport {
+
+    private Repository repository;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/repository");
+        super.setUp();
+    }
+
+    protected Repository getRepository() {
+        return repository;
+    }
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        Context context = super.createJndiContext();
+        repository = new TransientRepository("target/repository.xml", "target/repository");
+        context.bind("repository", repository);
+        return context;
+    }
+}

Modified: camel/trunk/components/camel-jcr/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/resources/log4j.properties?rev=1330053&r1=1330052&r2=1330053&view=diff
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-jcr/src/test/resources/log4j.properties Tue Apr 24 22:56:11
2012
@@ -19,6 +19,8 @@
 # The logging properties used for testing
 #
 log4j.rootLogger=INFO, file
+log4j.logger.org.apache.camel.component.jcr = TRACE, file
+log4j.additivity.org.apache.camel.component.jcr = false
 
 # uncomment the following to enable camel debugging
 #log4j.logger.org.apache.camel=DEBUG



Mime
View raw message