Author: bibryam
Date: Tue Apr 24 22:56:11 2012
New Revision: 1330053
URL: http://svn.apache.org/viewvc?rev=1330053&view=rev
Log:
CAMEL-5155: Support JCR Component as Consumer. Thanks to Woonsan Ko for the contribution
Added:
camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
Removed:
camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTest.java
Modified:
camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
camel/trunk/components/camel-jcr/src/test/resources/log4j.properties
Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
+
+/**
+ * A JCR {@link EventListener} which can be used to delegate processing to a
+ * Camel endpoint.
+ *
+ * @version $Id$
+ */
+public class EndpointEventListener implements EventListener {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(EndpointEventListener.class);
+
+ private final JcrEndpoint endpoint;
+ private final Processor processor;
+
+ public EndpointEventListener(JcrEndpoint endpoint, Processor processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ public void onEvent(EventIterator events) {
+ LOG.trace("onEvent START");
+ LOG.debug("{} consumer received JCR events: {}", endpoint, events);
+ RuntimeCamelException rce = null;
+
+ try {
+ final Exchange exchange = createExchange(events);
+
+ try {
+ LOG.debug("Processor, {}, is processing exchange, {}", processor, exchange);
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ rce = exchange.getException(RuntimeCamelException.class);
+ } catch (Exception e) {
+ rce = wrapRuntimeCamelException(e);
+ }
+
+ if (rce != null) {
+ LOG.trace("onEvent END throwing exception: {}", rce.toString());
+ throw rce;
+ }
+
+ LOG.trace("onEvent END");
+ }
+
+ private Exchange createExchange(EventIterator events) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.setIn(new JcrMessage(events));
+ return exchange;
+ }
+}
Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListener;
+
+import org.apache.camel.Processor;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Consumer} to consume JCR events.
+ *
+ * @version $Id$
+ */
+public class JcrConsumer extends DefaultConsumer implements SuspendableService {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(JcrConsumer.class);
+
+ private Session session;
+ private EventListener eventListener;
+ private ScheduledFuture<?> sessionListenerCheckerScheduledFuture;
+
+ public JcrConsumer(JcrEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ scheduleSessionListenerChecker();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ cancelSessionListenerChecker();
+ unregisterListenerAndLogoutSession();
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ super.doSuspend();
+ cancelSessionListenerChecker();
+ unregisterListenerAndLogoutSession();
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ super.doResume();
+ scheduleSessionListenerChecker();
+ }
+
+ protected JcrEndpoint getJcrEndpoint() {
+ JcrEndpoint endpoint = (JcrEndpoint) getEndpoint();
+ return endpoint;
+ }
+
+ private synchronized void createSessionAndRegisterListener() throws RepositoryException
{
+ LOG.trace("createSessionAndRegisterListener START");
+
+ session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
+
+ int eventTypes = getJcrEndpoint().getEventTypes();
+ String absPath = getJcrEndpoint().getBase();
+
+ if (absPath == null) {
+ absPath = "/";
+ } else if (!absPath.startsWith("/")) {
+ absPath = "/" + absPath;
+ }
+
+ boolean isDeep = getJcrEndpoint().isDeep();
+ String[] uuid = null;
+ String uuids = getJcrEndpoint().getUuids();
+
+ if (uuids != null) {
+ uuids = uuids.trim();
+
+ if (!"".equals(uuids)) {
+ uuid = uuids.split(",");
+ }
+ }
+
+ String[] nodeTypeName = null;
+ String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
+
+ if (nodeTypeNames != null) {
+ nodeTypeNames = nodeTypeNames.trim();
+
+ if (!"".equals(nodeTypeNames)) {
+ nodeTypeName = nodeTypeNames.split(",");
+ }
+ }
+
+ boolean noLocal = getJcrEndpoint().isNoLocal();
+
+ eventListener = new EndpointEventListener(getJcrEndpoint(), getProcessor());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes=" + eventTypes +
", isDeep=" + isDeep
+ + ", uuid=" + uuid + ", nodeTypeName=" + nodeTypeName + ", noLocal="
+ noLocal, eventListener,
+ absPath);
+ }
+
+ session.getWorkspace().getObservationManager()
+ .addEventListener(eventListener, eventTypes, absPath, isDeep, uuid, nodeTypeName,
noLocal);
+
+ LOG.trace("createSessionAndRegisterListener END");
+ }
+
+ private synchronized void unregisterListenerAndLogoutSession() throws RepositoryException
{
+ LOG.trace("unregisterListenerAndLogoutSession START");
+
+ if (session != null) {
+ try {
+ if (!session.isLive()) {
+ LOG.info("Session was is no more live.");
+ } else {
+ if (eventListener != null) {
+ session.getWorkspace().getObservationManager().removeEventListener(eventListener);
+ eventListener = null;
+ }
+
+ session.logout();
+ }
+ } finally {
+ eventListener = null;
+ session = null;
+ }
+ }
+
+ LOG.trace("unregisterListenerAndLogoutSession END");
+ }
+
+ private void cancelSessionListenerChecker() {
+ if (sessionListenerCheckerScheduledFuture != null) {
+ sessionListenerCheckerScheduledFuture.cancel(true);
+ }
+ }
+
+ private void scheduleSessionListenerChecker() {
+ String name = "JcrConsumerSessionChecker[" + getJcrEndpoint().getEndpointConfiguredDestinationName()
+ "]";
+ ScheduledExecutorService executor = getJcrEndpoint().getCamelContext().getExecutorServiceManager()
+ .newSingleThreadScheduledExecutor(this, name);
+ JcrConsumerSessionListenerChecker sessionListenerChecker = new JcrConsumerSessionListenerChecker();
+ long sessionLiveCheckIntervalOnStart = JcrConsumer.this.getJcrEndpoint().getSessionLiveCheckIntervalOnStart();
+ long sessionLiveCheckInterval = JcrConsumer.this.getJcrEndpoint().getSessionLiveCheckInterval();
+ sessionListenerCheckerScheduledFuture = executor.scheduleWithFixedDelay(sessionListenerChecker,
+ sessionLiveCheckIntervalOnStart, sessionLiveCheckInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private class JcrConsumerSessionListenerChecker implements Runnable {
+
+ public void run() {
+ LOG.debug("JcrConsumerSessionListenerChecker starts.");
+
+ boolean isSessionLive = false;
+
+ if (JcrConsumer.this.session != null) {
+ try {
+ isSessionLive = JcrConsumer.this.session.isLive();
+ } catch (Exception e) {
+ LOG.debug("Exception while checking jcr session", e);
+ }
+ }
+
+ if (!isSessionLive) {
+ try {
+ createSessionAndRegisterListener();
+ } catch (RepositoryException e) {
+ LOG.error("Failed to create session and register listener", e);
+ }
+ }
+
+ LOG.debug("JcrConsumerSessionListenerChecker stops.");
+ }
+ }
+
+}
Modified: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java?rev=1330053&r1=1330052&r2=1330053&view=diff
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
(original)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrEndpoint.java
Tue Apr 24 22:56:11 2012
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
/**
* A JCR endpoint
@@ -38,6 +39,15 @@ public class JcrEndpoint extends Default
private Repository repository;
private String base;
+ private int eventTypes;
+ private boolean deep;
+ private String uuids;
+ private String nodeTypeNames;
+ private boolean noLocal;
+
+ private long sessionLiveCheckIntervalOnStart = 3000L;
+ private long sessionLiveCheckInterval = 60000L;
+
protected JcrEndpoint(String endpointUri, JcrComponent component) {
super(endpointUri, component);
try {
@@ -47,8 +57,7 @@ public class JcrEndpoint extends Default
if (creds != null) {
String username = creds[0];
String password = creds.length > 1 ? creds[1] : null;
- this.credentials = new SimpleCredentials(username, password
- .toCharArray());
+ this.credentials = new SimpleCredentials(username, password.toCharArray());
}
}
this.repository = component.getCamelContext().getRegistry().lookup(uri.getHost(),
Repository.class);
@@ -66,7 +75,7 @@ public class JcrEndpoint extends Default
* @throws RuntimeCamelException
*/
public Consumer createConsumer(Processor processor) throws Exception {
- throw new RuntimeCamelException("No consumer endpoint support for JCR available");
+ return new JcrConsumer(this, processor);
}
public Producer createProducer() throws Exception {
@@ -104,4 +113,133 @@ public class JcrEndpoint extends Default
return base;
}
+ /**
+ * <code>eventTypes</code> (a combination of one or more event types encoded
+ * as a bit mask value such as javax.jcr.observation.Event.NODE_ADDED, javax.jcr.observation.Event.NODE_REMOVED,
etc.).
+ *
+ * @return eventTypes
+ * @see {@link javax.jcr.observation.Event}
+ * @see {@link javax.jcr.observation.ObservationManager#addEventListener(javax.jcr.observation.EventListener,
int, String, boolean, String[], String[], boolean)}
+ */
+ public int getEventTypes() {
+ return eventTypes;
+ }
+
+ public void setEventTypes(int eventTypes) {
+ this.eventTypes = eventTypes;
+ }
+
+ /**
+ * When <code>isDeep</code> is true, events whose associated parent node
is at
+ * <code>absPath</code> or within its subgraph are received.
+ * @return deep
+ */
+ public boolean isDeep() {
+ return deep;
+ }
+
+ public void setDeep(boolean deep) {
+ this.deep = deep;
+ }
+
+ /**
+ * When a comma separated uuid list string is set, only events whose associated parent
node has one of
+ * the identifiers in the comma separated uuid list will be received.
+ * @return comma separated uuid list string
+ */
+ public String getUuids() {
+ return uuids;
+ }
+
+ public void setUuids(String uuids) {
+ this.uuids = uuids;
+ }
+
+ /**
+ * When a comma separated <code>nodeTypeName</code> list string is set, only
events whose associated parent node has
+ * one of the node types (or a subtype of one of the node types) in this
+ * list will be received.
+ * @return
+ */
+ public String getNodeTypeNames() {
+ return nodeTypeNames;
+ }
+
+ public void setNodeTypeNames(String nodeTypeNames) {
+ this.nodeTypeNames = nodeTypeNames;
+ }
+
+ /**
+ * If <code>noLocal</code> is <code>true</code>, then events
+ * generated by the session through which the listener was registered are
+ * ignored. Otherwise, they are not ignored.
+ * @return noLocal
+ */
+ public boolean isNoLocal() {
+ return noLocal;
+ }
+
+ public void setNoLocal(boolean noLocal) {
+ this.noLocal = noLocal;
+ }
+
+ /**
+ * Interval in milliseconds to wait before the first session live checking.
+ * The default value is 3000 ms.
+ *
+ * @return sessionLiveCheckIntervalOnStart
+ */
+ public long getSessionLiveCheckIntervalOnStart() {
+ return sessionLiveCheckIntervalOnStart;
+ }
+
+ public void setSessionLiveCheckIntervalOnStart(long sessionLiveCheckIntervalOnStart)
{
+ if (sessionLiveCheckIntervalOnStart <= 0) {
+ throw new IllegalArgumentException("sessionLiveCheckIntervalOnStart must be positive
value");
+ }
+
+ this.sessionLiveCheckIntervalOnStart = sessionLiveCheckIntervalOnStart;
+ }
+
+ /**
+ * Interval in milliseconds to wait before each session live checking
+ * The default value is 60000 ms.
+ *
+ * @return
+ */
+ public long getSessionLiveCheckInterval() {
+ return sessionLiveCheckInterval;
+ }
+
+ public void setSessionLiveCheckInterval(long sessionLiveCheckInterval) {
+ if (sessionLiveCheckInterval <= 0) {
+ throw new IllegalArgumentException("sessionLiveCheckInterval must be positive
value");
+ }
+
+ this.sessionLiveCheckInterval = sessionLiveCheckInterval;
+ }
+
+ /**
+ * Gets the destination name which was configured from the endpoint uri.
+ *
+ * @return the destination name resolved from the endpoint uri
+ */
+ public String getEndpointConfiguredDestinationName() {
+ String remainder = ObjectHelper.after(getEndpointKey(), "//");
+
+ if (remainder != null && remainder.contains("@")) {
+ remainder = remainder.substring(remainder.indexOf('@'));
+ }
+
+ if (remainder != null && remainder.contains("?")) {
+ // remove parameters
+ remainder = ObjectHelper.before(remainder, "?");
+ }
+
+ if (ObjectHelper.isEmpty(remainder)) {
+ return remainder;
+ }
+
+ return remainder;
+ }
}
Added: camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
(added)
+++ camel/trunk/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrMessage.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Represents a {@link org.apache.camel.Message} for working with JCR
+ *
+ * @version $Id$
+ */
+public class JcrMessage extends DefaultMessage {
+
+ private EventIterator eventIterator;
+ private List<Event> eventList;
+
+ public JcrMessage(EventIterator eventIterator) {
+ this.eventIterator = eventIterator;
+ }
+
+ @Override
+ public String toString() {
+ if (eventIterator != null) {
+ return "JcrMessage[eventIterator: " + eventIterator + ", eventList: " + eventList
+ "]";
+ }
+
+ return "JcrMessage@" + ObjectHelper.getIdentityHashCode(this);
+ }
+
+ @Override
+ public void copyFrom(org.apache.camel.Message that) {
+ // must initialize headers before we set the JmsMessage to avoid Camel
+ // populating it before we do the copy
+ getHeaders().clear();
+
+ if (that instanceof JcrMessage) {
+ JcrMessage thatMessage = (JcrMessage) that;
+ this.eventIterator = thatMessage.eventIterator;
+ this.eventList = thatMessage.eventList;
+ }
+
+ // copy body and fault flag
+ setBody(that.getBody());
+ setFault(that.isFault());
+
+ // we have already cleared the headers
+ if (that.hasHeaders()) {
+ getHeaders().putAll(that.getHeaders());
+ }
+
+ getAttachments().clear();
+
+ if (that.hasAttachments()) {
+ getAttachments().putAll(that.getAttachments());
+ }
+ }
+
+ public EventIterator getEventIterator() {
+ return eventIterator;
+ }
+
+ @Override
+ protected Object createBody() {
+ if (eventList == null) {
+ eventList = new LinkedList<Event>();
+
+ if (eventIterator != null) {
+ while (eventIterator.hasNext()) {
+ eventList.add(eventIterator.nextEvent());
+ }
+ }
+ }
+
+ return eventList;
+ }
+}
Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrConsumerTest.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import java.util.List;
+import javax.jcr.Node;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JcrConsumerTest
+ *
+ * @version $Id$
+ */
+public class JcrConsumerTest extends JcrRouteTestSupport {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(JcrConsumerTest.class);
+
+ private String absPath = "/home/test";
+ private int eventTypes = Event.NODE_ADDED;
+ private boolean deep = true;
+ private String uuids;
+ private String nodeTypeNames;
+ private boolean noLocal;
+
+ @Test
+ public void testJcrConsumer() throws Exception {
+ // start consumer thread first
+ JcrConsumerThread consumerThread = new JcrConsumerThread();
+ consumerThread.start();
+ // wait until the consumer thread has tried to receive event at least once
+ while (consumerThread.getReceiveTrialTimes() < 1) {
+ Thread.sleep(10L);
+ }
+
+ // now create a node under the specified event node path
+
+ Session session = getRepository().login(new SimpleCredentials("user", "pass".toCharArray()));
+
+ try {
+ Node folderNode = session.getRootNode();
+
+ for (String folderNodeName : absPath.split("\\/")) {
+ if (!"".equals(folderNodeName)) {
+ if (folderNode.hasNode(folderNodeName)) {
+ folderNode.getNode(folderNodeName).remove();
+ }
+
+ folderNode = folderNode.addNode(folderNodeName, "nt:unstructured");
+ }
+ }
+
+ folderNode.addNode("node", "nt:unstructured");
+ session.save();
+ } finally {
+ if (session != null && session.isLive()) {
+ session.logout();
+ }
+ }
+
+ // wait until the consumer thread captures an event
+ consumerThread.join();
+
+ Exchange exchange = consumerThread.getExchange();
+ assertNotNull(exchange);
+
+ Message message = exchange.getIn();
+ assertNotNull(message);
+ assertTrue(message instanceof JcrMessage);
+ EventIterator eventIterator = ((JcrMessage)message).getEventIterator();
+ assertNotNull(eventIterator);
+ assertEquals(1, eventIterator.getSize());
+
+ Object body = message.getBody();
+ assertTrue(body instanceof List);
+ @SuppressWarnings("unchecked")
+ List<Event> eventList = (List<Event>)body;
+ assertEquals(1, eventList.size());
+ Event event = eventList.get(0);
+ assertEquals(Event.NODE_ADDED, event.getType());
+ assertNotNull(event.getPath());
+ assertTrue(event.getPath().startsWith(absPath));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ String uri = "jcr://user:pass@repository";
+ uri += absPath;
+ uri += "?eventTypes=" + eventTypes;
+ uri += "&deep=" + deep;
+ uri += "&noLocal=" + noLocal;
+ from(uri).to("direct:a");
+ }
+ };
+ }
+
+ private class JcrConsumerThread extends Thread {
+
+ private Exchange exchange;
+ private int receiveTrialTimes;
+
+ public void run() {
+ while (exchange == null) {
+ exchange = consumer.receive("direct:a", 10L);
+ ++receiveTrialTimes;
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ LOG.debug("JcrConsumerThread receive exchange, {} after {} trials", exchange,
receiveTrialTimes);
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public int getReceiveTrialTimes() {
+ return receiveTrialTimes;
+ }
+ }
+}
Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrProducerTest.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JcrProducerTest extends JcrRouteTestSupport {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ deleteDirectory("target/repository");
+ super.setUp();
+ }
+
+ @Test
+ public void testJcrProducer() throws Exception {
+ Exchange exchange = createExchangeWithBody("<hello>world!</hello>");
+ Exchange out = template.send("direct:a", exchange);
+ assertNotNull(out);
+ String uuid = out.getOut().getBody(String.class);
+ Session session = getRepository().login(new SimpleCredentials("user", "pass".toCharArray()));
+ try {
+ Node node = session.getNodeByIdentifier(uuid);
+ assertNotNull(node);
+ assertEquals("/home/test/node", node.getPath());
+ assertEquals("<hello>world!</hello>", node.getProperty("my.contents.property").getString());
+ } finally {
+ if (session != null && session.isLive()) {
+ session.logout();
+ }
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // START SNIPPET: jcr
+ from("direct:a").setProperty(JcrConstants.JCR_NODE_NAME, constant("node"))
+ .setProperty("my.contents.property", body())
+ .to("jcr://user:pass@repository/home/test");
+ // END SNIPPET: jcr
+ }
+ };
+ }
+
+}
Added: camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java?rev=1330053&view=auto
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
(added)
+++ camel/trunk/components/camel-jcr/src/test/java/org/apache/camel/component/jcr/JcrRouteTestSupport.java
Tue Apr 24 22:56:11 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcr;
+
+import javax.jcr.Repository;
+import javax.naming.Context;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.jackrabbit.core.TransientRepository;
+import org.junit.Before;
+
+/**
+ * JcrRouteTestSupport
+ *
+ * @version $Id$
+ */
+public abstract class JcrRouteTestSupport extends CamelTestSupport {
+
+ private Repository repository;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ deleteDirectory("target/repository");
+ super.setUp();
+ }
+
+ protected Repository getRepository() {
+ return repository;
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ Context context = super.createJndiContext();
+ repository = new TransientRepository("target/repository.xml", "target/repository");
+ context.bind("repository", repository);
+ return context;
+ }
+}
Modified: camel/trunk/components/camel-jcr/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jcr/src/test/resources/log4j.properties?rev=1330053&r1=1330052&r2=1330053&view=diff
==============================================================================
--- camel/trunk/components/camel-jcr/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-jcr/src/test/resources/log4j.properties Tue Apr 24 22:56:11
2012
@@ -19,6 +19,8 @@
# The logging properties used for testing
#
log4j.rootLogger=INFO, file
+log4j.logger.org.apache.camel.component.jcr = TRACE, file
+log4j.additivity.org.apache.camel.component.jcr = false
# uncomment the following to enable camel debugging
#log4j.logger.org.apache.camel=DEBUG
|