camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1446682 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/processor/aggregator/
Date Fri, 15 Feb 2013 16:48:28 GMT
Author: davsclaus
Date: Fri Feb 15 16:48:27 2013
New Revision: 1446682

URL: http://svn.apache.org/r1446682
Log:
CAMEL-6042: Added OptimisticLockingAggregationRepository. Thanks to Aaron Whiteside for the patch.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Fri Feb 15 16:48:27 2013
@@ -73,6 +73,8 @@ public class AggregateDefinition extends
     @XmlAttribute
     private Boolean parallelProcessing;
     @XmlAttribute
+    private Boolean optimisticLocking;
+    @XmlAttribute
     private String executorServiceRef;
     @XmlAttribute
     private String timeoutCheckerExecutorServiceRef;
@@ -195,6 +197,7 @@ public class AggregateDefinition extends
 
         // set other options
         answer.setParallelProcessing(isParallelProcessing());
+        answer.setOptimisticLocking(isOptimisticLocking());
         if (getCompletionPredicate() != null) {
             Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
             answer.setCompletionPredicate(predicate);
@@ -385,6 +388,18 @@ public class AggregateDefinition extends
         this.executorService = executorService;
     }
 
+    public Boolean getOptimisticLocking() {
+        return optimisticLocking;
+    }
+
+    public void setOptimisticLocking(boolean optimisticLocking) {
+        this.optimisticLocking = optimisticLocking;
+    }
+
+    public boolean isOptimisticLocking() {
+        return optimisticLocking != null && optimisticLocking;
+    }
+
     public Boolean getParallelProcessing() {
         return parallelProcessing;
     }
@@ -718,6 +733,11 @@ public class AggregateDefinition extends
         setParallelProcessing(true);
         return this;
     }
+
+    public AggregateDefinition optimisticLocking() {
+        setOptimisticLocking(true);
+        return this;
+    }
     
     public AggregateDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Feb 15 16:48:27 2013
@@ -17,13 +17,12 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +45,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.Synchronization;
@@ -94,10 +94,10 @@ public class AggregateProcessor extends 
     // store correlation key -> exchange id in timeout map
     private TimeoutMap<String, String> timeoutMap;
     private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
-    private AggregationRepository aggregationRepository = new MemoryAggregationRepository();
-    private Map<Object, Object> closedCorrelationKeys;
-    private Set<String> batchConsumerCorrelationKeys = new LinkedHashSet<String>();
-    private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
+    private AggregationRepository aggregationRepository;
+    private Map<String, String> closedCorrelationKeys;
+    private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>();
+    private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
     private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
 
     // keep booking about redelivery
@@ -109,6 +109,7 @@ public class AggregateProcessor extends 
     private boolean ignoreInvalidCorrelationKeys;
     private Integer closeCorrelationKeyOnCompletion;
     private boolean parallelProcessing;
+    private boolean optimisticLocking;
 
     // different ways to have completion triggered
     private boolean eagerCheckCompletion;
@@ -189,18 +190,38 @@ public class AggregateProcessor extends 
             throw new ClosedCorrelationKeyException(key, exchange);
         }
 
-        // copy exchange, and do not share the unit of work
-        // the aggregated output runs in another unit of work
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-
-        // when memory based then its fast using synchronized, but if the aggregation repository is IO
-        // bound such as JPA etc then concurrent aggregation per correlation key could
-        // improve performance as we can run aggregation repository get/add in parallel
-        lock.lock();
-        try {
-            doAggregation(key, copy);
-        } finally {
-            lock.unlock();
+        //
+        // todo: explain optimistic lock handling
+        if (optimisticLocking) {
+            boolean done = false;
+            int attempt = 0;
+            while (!done) {
+                attempt++;
+                // copy exchange, and do not share the unit of work
+                // the aggregated output runs in another unit of work
+                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+                try {
+                    doAggregation(key, copy);
+                    done = true;
+                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+                    LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}",
+                              new Object[]{attempt, aggregationRepository, key, copy, e});
+                }
+            }
+        } else {
+            // copy exchange, and do not share the unit of work
+            // the aggregated output runs in another unit of work
+            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
+            // when memory based then its fast using synchronized, but if the aggregation repository is IO
+            // bound such as JPA etc then concurrent aggregation per correlation key could
+            // improve performance as we can run aggregation repository get/add in parallel
+            lock.lock();
+            try {
+                doAggregation(key, copy);
+            } finally {
+                lock.unlock();
+            }
         }
     }
 
@@ -211,19 +232,24 @@ public class AggregateProcessor extends 
      * in parallel.
      *
      * @param key      the correlation key
-     * @param exchange the exchange
+     * @param newExchange the exchange
      * @return the aggregated exchange
      * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
      */
-    private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException {
+    private Exchange doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
 
         Exchange answer;
-        Exchange oldExchange = aggregationRepository.get(exchange.getContext(), key);
-        Exchange newExchange = exchange;
+        Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
+        Exchange oldExchange = originalExchange;
 
         Integer size = 1;
         if (oldExchange != null) {
+            // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not
+            // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository.
+            if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) {
+                oldExchange = originalExchange.copy();
+            }
             size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
             size++;
         }
@@ -238,16 +264,16 @@ public class AggregateProcessor extends 
             newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
         }
 
-        // prepare the exchanges for aggregation and aggregate it
+        // prepare the exchanges for aggregation and then aggregate them
         ExchangeHelper.prepareAggregation(oldExchange, newExchange);
         // must catch any exception from aggregation
         try {
-            answer = onAggregation(oldExchange, exchange);
+            answer = onAggregation(oldExchange, newExchange);
         } catch (Throwable e) {
-            throw new CamelExchangeException("Error occurred during aggregation", exchange, e);
+            throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
         }
         if (answer == null) {
-            throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", exchange);
+            throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
         }
 
         // update the aggregated size
@@ -260,8 +286,7 @@ public class AggregateProcessor extends 
 
         // only need to update aggregation repository if we are not complete
         if (complete == null) {
-            LOG.trace("In progress aggregated exchange: {} with correlation key: {}", answer, key);
-            aggregationRepository.add(exchange.getContext(), key, answer);
+            doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
         } else {
             // if batch consumer completion is enabled then we need to complete the group
             if ("consumer".equals(complete)) {
@@ -276,14 +301,14 @@ public class AggregateProcessor extends 
 
                     if (batchAnswer != null) {
                         batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                        onCompletion(batchKey, batchAnswer, false);
+                        onCompletion(batchKey, originalExchange, batchAnswer, false);
                     }
                 }
                 batchConsumerCorrelationKeys.clear();
             } else {
                 // we are complete for this exchange
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                onCompletion(key, answer, false);
+                onCompletion(key, originalExchange, answer, false);
             }
         }
 
@@ -292,6 +317,28 @@ public class AggregateProcessor extends 
         return answer;
     }
 
+    protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
+        LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key});
+        if (optimisticLocking) {
+            try {
+                ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange);
+            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+                onOptimisticLockingFailure(oldExchange, newExchange);
+                throw e;
+            }
+        } else {
+            aggregationRepository.add(camelContext, key, newExchange);
+        }
+    }
+
+    protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
+        if (aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) {
+            LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
+                      new Object[]{aggregationStrategy, oldExchange, newExchange});
+            ((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange, newExchange);
+        }
+    }
+
     /**
      * Tests whether the given exchange is complete or not
      *
@@ -368,11 +415,13 @@ public class AggregateProcessor extends 
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected void onCompletion(final String key, final Exchange exchange, boolean fromTimeout) {
+    protected void onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
+        // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
+        aggregationRepository.remove(aggregated.getContext(), key, original);
+
         // store the correlation key as property
-        exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
-        // remove from repository as its completed
-        aggregationRepository.remove(exchange.getContext(), key, exchange);
+        aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
+
         if (!fromTimeout && timeoutMap != null) {
             // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
             timeoutMap.remove(key);
@@ -388,24 +437,24 @@ public class AggregateProcessor extends 
             // to allow any custom processing before discarding the exchange
             if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
                 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
-                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange, -1, -1, timeout);
+                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(aggregated, -1, -1, timeout);
             }
         }
 
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
             // discard due timeout
-            LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: ()", key, exchange);
+            LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
             // must confirm the discarded exchange
-            aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
+            aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
             // and remove redelivery state as well
-            redeliveryState.remove(exchange.getExchangeId());
+            redeliveryState.remove(aggregated.getExchangeId());
         } else {
             // the aggregated exchange should be published (sent out)
-            onSubmitCompletion(key, exchange);
+            onSubmitCompletion(key, aggregated);
         }
     }
 
-    private void onSubmitCompletion(final Object key, final Exchange exchange) {
+    private void onSubmitCompletion(final String key, final Exchange exchange) {
         LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange);
 
         // add this as in progress before we submit the task
@@ -445,7 +494,7 @@ public class AggregateProcessor extends 
      * Restores the timeout map with timeout values from the aggregation repository.
      * <p/>
      * This is needed in case the aggregator has been stopped and started again (for example a server restart).
-     * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored.
+     * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored.
      */
     protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
         // grab the timeout value for each partly aggregated exchange
@@ -581,6 +630,14 @@ public class AggregateProcessor extends 
         this.parallelProcessing = parallelProcessing;
     }
 
+    public boolean isOptimisticLocking() {
+        return optimisticLocking;
+    }
+
+    public void setOptimisticLocking(boolean optimisticLocking) {
+        this.optimisticLocking = optimisticLocking;
+    }
+
     public AggregationRepository getAggregationRepository() {
         return aggregationRepository;
     }
@@ -663,17 +720,17 @@ public class AggregateProcessor extends 
 
         private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
             // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead
-            super(executor, requestMapPollTimeMillis, false);
+            super(executor, requestMapPollTimeMillis, optimisticLocking);
         }
 
         @Override
         public void purge() {
             // must acquire the shared aggregation lock to be able to purge
-            lock.lock();
+            if (!optimisticLocking) { lock.lock(); }
             try {
                 super.purge();
             } finally {
-                lock.unlock();
+                if (!optimisticLocking) { lock.unlock(); }
             }
         }
 
@@ -688,11 +745,23 @@ public class AggregateProcessor extends 
             }
 
             // get the aggregated exchange
+            boolean evictionStolen = false;
             Exchange answer = aggregationRepository.get(camelContext, key);
-            if (answer != null) {
+            if (answer == null) {
+                evictionStolen = true;
+            } else {
                 // indicate it was completed by timeout
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
-                onCompletion(key, answer, true);
+                try {
+                    onCompletion(key, answer, answer, true);
+                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+                    evictionStolen = true;
+                }
+            }
+
+            if (optimisticLocking && evictionStolen) {
+                LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction "
+                          + "for exchange with id: {} and correlation id: {}", exchangeId, key);
             }
             return true;
         }
@@ -717,19 +786,29 @@ public class AggregateProcessor extends 
 
             if (keys != null && !keys.isEmpty()) {
                 // must acquire the shared aggregation lock to be able to trigger interval completion
-                lock.lock();
+                if (!optimisticLocking) { lock.lock(); }
                 try {
                     for (String key : keys) {
+                        boolean stolenInterval = false;
                         Exchange exchange = aggregationRepository.get(camelContext, key);
-                        if (exchange != null) {
+                        if (exchange == null) {
+                            stolenInterval = true;
+                        } else {
                             LOG.trace("Completion interval triggered for correlation key: {}", key);
                             // indicate it was completed by interval
                             exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
-                            onCompletion(key, exchange, false);
+                            try {
+                                onCompletion(key, exchange, exchange, false);
+                            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+                                stolenInterval = true;
+                            }
+                        }
+                        if (optimisticLocking && stolenInterval) {
+                            LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key);
                         }
                     }
                 } finally {
-                    lock.unlock();
+                    if (!optimisticLocking) { lock.unlock(); }
                 }
             }
 
@@ -842,13 +921,25 @@ public class AggregateProcessor extends 
         if (getCloseCorrelationKeyOnCompletion() != null) {
             if (getCloseCorrelationKeyOnCompletion() > 0) {
                 LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
-                closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion());
+                closedCorrelationKeys = new LRUCache<String, String>(getCloseCorrelationKeyOnCompletion());
             } else {
                 LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
-                closedCorrelationKeys = new HashMap<Object, Object>();
+                closedCorrelationKeys = new ConcurrentHashMap<String, String>();
             }
         }
 
+        if (aggregationRepository == null) {
+            aggregationRepository = new MemoryAggregationRepository(optimisticLocking);
+            LOG.info("Defaulting to MemoryAggregationRepository");
+        }
+
+        if (optimisticLocking) {
+            if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) {
+                throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository");
+            }
+            LOG.info("Optimistic locking is enabled");
+        }
+
         ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository);
 
         // should we use recover checker
@@ -1000,7 +1091,7 @@ public class AggregateProcessor extends 
         int total = 0;
         if (keys != null && !keys.isEmpty()) {
             // must acquire the shared aggregation lock to be able to trigger force completion
-            lock.lock();
+            if (!optimisticLocking) { lock.lock(); }
             total = keys.size();
             try {
                 for (String key : keys) {
@@ -1009,11 +1100,11 @@ public class AggregateProcessor extends 
                         LOG.trace("Force completion triggered for correlation key: {}", key);
                         // indicate it was completed by a force completion request
                         exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
-                        onCompletion(key, exchange, false);
+                        onCompletion(key, exchange, exchange, false);
                     }
                 }
             } finally {
-                lock.unlock();
+                if (!optimisticLocking) { lock.unlock(); }
             }
         }
         LOG.trace("Completed force completion of all groups task");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Fri Feb 15 16:48:27 2013
@@ -17,24 +17,50 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.Collections;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.support.ServiceSupport;
 
 /**
- * A memory based {@link org.apache.camel.spi.AggregationRepository} which stores in memory only.
+ * A memory based {@link org.apache.camel.spi.AggregationRepository} which stores {@link Exchange}s in memory only.
+ *
+ * Supports both optimistic locking and non-optimistic locking modes. Defaults to non-optimistic locking mode.
  *
  * @version 
  */
-public class MemoryAggregationRepository extends ServiceSupport implements AggregationRepository {
-    private final Map<String, Exchange> cache = new ConcurrentHashMap<String, Exchange>();
+public class MemoryAggregationRepository extends ServiceSupport implements OptimisticLockingAggregationRepository {
+    private final ConcurrentMap<String, Exchange> cache = new ConcurrentHashMap<String, Exchange>();
+    private final boolean optimisticLocking;
+
+    public MemoryAggregationRepository() {
+        this(false);
+    }
+
+    public MemoryAggregationRepository(boolean optimisticLocking) {
+        this.optimisticLocking = optimisticLocking;
+    }
+
+    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
+        if (!optimisticLocking) { throw new UnsupportedOperationException(); }
+        if (oldExchange == null) {
+            if (cache.putIfAbsent(key, newExchange) != null) {
+                throw new OptimisticLockingException();
+            }
+        } else {
+            if (!cache.replace(key, oldExchange, newExchange)) {
+                throw new OptimisticLockingException();
+            }
+        }
+        return oldExchange;
+    }
 
     public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+        if (optimisticLocking) { throw new UnsupportedOperationException(); }
         return cache.put(key, exchange);
     }
 
@@ -43,7 +69,13 @@ public class MemoryAggregationRepository
     }
 
     public void remove(CamelContext camelContext, String key, Exchange exchange) {
-        cache.remove(key);
+        if (optimisticLocking) {
+            if (!cache.remove(key, exchange)) {
+                throw new OptimisticLockingException();
+            }
+        } else {
+            cache.remove(key);
+        }
     }
 
     public void confirm(CamelContext camelContext, String exchangeId) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A specialized {@link AggregationStrategy} which gets a callback when the aggregated {@link Exchange} fails to add
+ * in the {@link org.apache.camel.spi.OptimisticLockingAggregationRepository} because of
+ * an {@link org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException}.
+ *
+ * Please note that when aggregating {@link Exchange}'s to be careful not to modify and return the {@code oldExchange}
+ * from the {@code aggregate()} method. If you are using the default MemoryAggregationRepository this will mean you have
+ * modified the value of an object already referenced/stored by the MemoryAggregationRepository. This makes it impossible
+ * for optimistic locking to work correctly with the MemoryAggregationRepository.
+ *
+ * You should instead return either the new {@code newExchange} or a completely new instance of {@link Exchange}. This
+ * is due to the nature of how the underlying {@link java.util.concurrent.ConcurrentHashMap} performs CAS operations on the value identity.
+ *
+ * @version
+ */
+public interface OptimisticLockingAwareAggregationStrategy extends AggregationStrategy {
+
+    // TODO: In Camel 3.0 we should move this to org.apache.camel package
+
+    void onOptimisticLockFailure(Exchange oldExchange, Exchange newExchange);
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+
+/**
+ * A specialized {@link org.apache.camel.spi.AggregationRepository} which also supports
+ * optimistic locking.
+ *
+ * If the underlying implementation cannot perform optimistic locking, it should
+ * not implement this interface.
+ *
+ * @version
+ */
+public interface OptimisticLockingAggregationRepository extends AggregationRepository {
+
+    /**
+     * {@link Exception} used by an {@code AggregationRepository} to indicate that an optimistic
+     * update error has occurred and that the operation should be retried by the caller.
+     * <p/>
+     */
+    public static class OptimisticLockingException extends RuntimeException { }
+
+    /**
+     * Add the given {@link org.apache.camel.Exchange} under the correlation key.
+     * <p/>
+     * Will perform optimistic locking to replace expected existing exchange with
+     * the new supplied exchange.
+     *
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param oldExchange    the old exchange that is expected to exist when replacing with the new exchange
+     * @param newExchange    the new aggregated exchange, to replace old exchange
+     * @return the old exchange if any existed
+     * @throws OptimisticLockingException This should be thrown when the currently stored exchange differs from the supplied oldExchange.
+     */
+    Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException;
+
+    /**
+     *
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param exchange       the exchange to remove
+     * @throws OptimisticLockingException This should be thrown when the exchange has already been deleted, or otherwise modified.
+     */
+    void remove(CamelContext camelContext, String key, Exchange exchange) throws OptimisticLockingException;
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * @version
+ */
+public abstract class AbstractDistributedTest extends ContextTestSupport {
+
+    protected CamelContext context2;
+    protected ProducerTemplate template2;
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        context2 = new DefaultCamelContext();
+        template2 = context2.createProducerTemplate();
+        ServiceHelper.startServices(template2, context2);
+
+        // add routes after CamelContext has been started
+        context2.addRoutes(createRouteBuilder());
+    }
+
+    public void tearDown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(context2, template2);
+
+        super.tearDown();
+    }
+
+    protected MockEndpoint getMockEndpoint2(String uri) {
+        return context2.getEndpoint(uri, MockEndpoint.class);
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+/**
+ * Unit test to verify that aggregate by interval only also works.
+ *
+ * @version 
+ */
+public class DistributedCompletionIntervalTest extends AbstractDistributedTest {
+
+    public void testAggregateInterval() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        MockEndpoint mock2 = getMockEndpoint2("mock:result");
+        // by default the use latest aggregation strategy is used so we get message 18 and message 19
+        mock.expectedBodiesReceived("Message 18");
+        mock2.expectedBodiesReceived("Message 19");
+
+        // ensure messages are send after the 1s
+        Thread.sleep(2000);
+        
+        for (int i = 0; i < 20; i++) {
+            int choice = i % 2;
+            if (choice == 0) {
+                template.sendBodyAndHeader("direct:start", "Message " + i, "id", "1");
+            } else {
+                template2.sendBodyAndHeader("direct:start", "Message " + i, "id", "1");
+            }
+        }
+
+        mock.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .aggregate(header("id"), new UseLatestAggregationStrategy())
+                        // trigger completion every 5th second
+                        .completionInterval(5000)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DistributedConcurrentPerCorrelationKeyTest extends AbstractDistributedTest {
+
+    private MemoryAggregationRepository sharedAggregationRepository = new MemoryAggregationRepository(true);
+
+    private int size = 200;
+    private final String uri = "direct:start";
+
+    @Test
+    public void testAggregateConcurrentPerCorrelationKey() throws Exception {
+        ExecutorService service = Executors.newFixedThreadPool(50);
+        List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+        for (int i = 0; i < size; i++) {
+            final int id = i % 25;
+            final int choice = i % 2;
+            final int count = i;
+            tasks.add(new Callable<Object>() {
+                public Object call() throws Exception {
+                    if (choice == 0) {
+                        template.sendBodyAndHeader(uri, "" + count, "id", id);
+                    } else {
+                        template2.sendBodyAndHeader(uri, "" + count, "id", id);
+                    }
+                    return null;
+                }
+            });
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        MockEndpoint mock2 = getMockEndpoint2("mock:result");
+
+        // submit all tasks
+        service.invokeAll(tasks);
+        service.shutdown();
+        service.awaitTermination(10, TimeUnit.SECONDS);
+
+        int contextCount = mock.getReceivedCounter();
+        int context2Count = mock2.getReceivedCounter();
+
+        assertEquals(25, contextCount + context2Count);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new BodyInAggregatingStrategy())
+                            .aggregationRepository(sharedAggregationRepository)
+                            .optimisticLocking()
+                            .completionSize(8)
+                        .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java?rev=1446682&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java Fri Feb 15 16:48:27 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
+
+/**
+ * @version
+ */
+public class DistributedTimeoutTest extends AbstractDistributedTest {
+
+    private MemoryAggregationRepository sharedAggregationRepository = new MemoryAggregationRepository(true);
+
+    private final AtomicInteger invoked = new AtomicInteger();
+    private volatile Exchange receivedExchange;
+    private volatile int receivedIndex;
+    private volatile int receivedTotal;
+    private volatile long receivedTimeout;
+
+    public void testAggregateTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        MockEndpoint mock2 = getMockEndpoint2("mock:aggregated");
+        mock.expectedMessageCount(0);
+        mock2.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template2.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        // wait 3 seconds so that the timeout kicks in
+        Thread.sleep(3000);
+
+        mock.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+
+        // should invoke the timeout method
+        assertEquals(1, invoked.get());
+
+        assertNotNull(receivedExchange);
+        assertEquals("AB", receivedExchange.getIn().getBody());
+        assertEquals(-1, receivedIndex);
+        assertEquals(-1, receivedTotal);
+        assertEquals(2000, receivedTimeout);
+
+        mock.reset();
+        mock.expectedMessageCount(0);
+        mock2.reset();
+        mock2.expectedBodiesReceived("ABC");
+
+        // now send 3 exchanges which shouldn't trigger the timeout anymore
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template2.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template2.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+        // should complete before timeout
+        mock2.assertIsSatisfied(2000);
+        mock.assertIsSatisfied(5000);
+
+        // should have not invoked the timeout method anymore
+        assertEquals(1, invoked.get());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .aggregationRepository(sharedAggregationRepository)
+                        .optimisticLocking()
+                        .discardOnCompletionTimeout()
+                        .completionSize(3)
+                        .completionTimeout(2000)  // use a 2 second timeout
+                        .to("mock:aggregated");
+            }
+        };
+    }
+
+    private class MyAggregationStrategy implements TimeoutAwareAggregationStrategy {
+
+        public void timeout(Exchange oldExchange, int index, int total, long timeout) {
+            invoked.incrementAndGet();
+
+            // we can't assert on the expected values here as the contract of this method doesn't
+            // allow to throw any Throwable (including AssertionFailedError) so that we assert
+            // about the expected values directly inside the test method itself. other than that
+            // asserting inside a thread other than the main thread dosen't make much sense as
+            // junit would not realize the failed assertion!
+            receivedExchange = oldExchange;
+            receivedIndex = index;
+            receivedTotal = total;
+            receivedTimeout = timeout;
+        }
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            String body = oldExchange.getIn().getBody(String.class);
+            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+            return oldExchange;
+        }
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message