incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r1549567 [2/5] - in /sling/trunk/contrib/extensions/replication: ./ src/main/java/org/apache/sling/replication/agent/ src/main/java/org/apache/sling/replication/agent/impl/ src/main/java/org/apache/sling/replication/communication/ src/main/...
Date Mon, 09 Dec 2013 13:37:50 GMT
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Mon Dec  9 13:37:48 2013
@@ -27,10 +27,30 @@ import org.apache.sling.replication.seri
  */
 public interface ReplicationQueueDistributionStrategy {
 
+    /**
+     * synchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
+     * provided by the given {@link ReplicationQueueProvider}
+     *
+     * @param replicationPackage a {@link ReplicationPackage} to distribute
+     * @param agent              the {@link ReplicationAgent} to be used for replicating the package
+     * @param queueProvider      the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+     * @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
+     * @throws ReplicationQueueException
+     */
     ReplicationQueueItemState add(ReplicationPackage replicationPackage, ReplicationAgent agent,
                                   ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
 
-    void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
-               ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+    /**
+     * asynchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
+     * provided by the given {@link ReplicationQueueProvider}
+     *
+     * @param replicationPackage a {@link ReplicationPackage} to distribute
+     * @param agent              the {@link ReplicationAgent} to be used for replicating the package
+     * @param queueProvider      the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+     * @return <code>true</code> if the package could be distributed to a {@link ReplicationQueue}, <code>false</code> otherwise
+     * @throws ReplicationQueueException
+     */
+    boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                  ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java Mon Dec  9 13:37:48 2013
@@ -18,9 +18,12 @@
  */
 package org.apache.sling.replication.queue;
 
+import javax.jcr.RepositoryException;
+
 /**
  * Represents errors happening during queue operations
  */
+@SuppressWarnings("serial")
 public class ReplicationQueueException extends Exception {
 
     public ReplicationQueueException(String message, Exception e) {
@@ -30,4 +33,8 @@ public class ReplicationQueueException e
     public ReplicationQueueException(String string) {
         super(string);
     }
+
+    public ReplicationQueueException(RepositoryException e) {
+        super(e);
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java Mon Dec  9 13:37:48 2013
@@ -26,16 +26,14 @@ public class ReplicationQueueItemState {
 
     private int attempts;
 
-    private boolean successfull;
-
     private ItemState state;
 
-    public boolean isSuccessfull() {
+    public boolean isSuccessful() {
         return ItemState.SUCCEEDED.equals(state);
     }
 
-    public void setSuccessfull(boolean successfull) {
-        state = successfull ? ItemState.SUCCEEDED : ItemState.ERROR;
+    public void setSuccessful(boolean successful) {
+        state = successful ? ItemState.SUCCEEDED : ItemState.ERROR;
     }
 
     public int getAttempts() {
@@ -56,9 +54,7 @@ public class ReplicationQueueItemState {
 
     @Override
     public String toString() {
-        return new StringBuilder("{\"attempts\":\"").append(attempts).append("\",\"")
-                        .append("successfull\":\"").append(successfull).append("\",\"")
-                        .append("state\":\"").append(state).append("\"}").toString();
+        return "{\"attempts\":\"" + attempts + "\",\"" + "successful\":\"" + isSuccessful() + "\",\"" + "state\":\"" + state + "\"}";
     }
 
     public enum ItemState {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Mon Dec  9 13:37:48 2013
@@ -39,7 +39,7 @@ public interface ReplicationQueueProvide
      * @return a replication queue to be used for the given parameters
      * @throws ReplicationQueueException
      */
-    ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String name)
+    ReplicationQueue getQueue(ReplicationAgent agent, String name)
                     throws ReplicationQueueException;
 
     /**
@@ -53,7 +53,7 @@ public interface ReplicationQueueProvide
      * @return a replication queue to be used for the given parameters
      * @throws ReplicationQueueException
      */
-    ReplicationQueue getOrCreateQueue(ReplicationAgent agent, ReplicationPackage replicationPackage)
+    ReplicationQueue getQueue(ReplicationAgent agent, ReplicationPackage replicationPackage)
                     throws ReplicationQueueException;
 
     /**
@@ -64,7 +64,7 @@ public interface ReplicationQueueProvide
      * @return the default replication queue for the given agent
      * @throws ReplicationQueueException
      */
-    ReplicationQueue getOrCreateDefaultQueue(ReplicationAgent agent)
+    ReplicationQueue getDefaultQueue(ReplicationAgent agent)
                     throws ReplicationQueueException;
 
     /**
@@ -73,7 +73,7 @@ public interface ReplicationQueueProvide
      * @return a collection of replication queues
      * @throws ReplicationQueueException
      */
-    Collection<ReplicationQueue> getAllQueues() throws ReplicationQueueException;
+    Collection<ReplicationQueue> getAllQueues();
 
     /**
      * removes an existing queue owned by this provider

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Mon Dec  9 13:37:48 2013
@@ -38,22 +38,23 @@ public abstract class AbstractReplicatio
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private Map<String, ReplicationQueue> queueMap = new HashMap<String, ReplicationQueue>();
+    private final Map<String, ReplicationQueue> queueMap = new HashMap<String, ReplicationQueue>();
 
-    public ReplicationQueue getOrCreateQueue(ReplicationAgent agent,
-                    ReplicationPackage replicationPackage) throws ReplicationQueueException {
-        return getOrCreateQueue(agent, replicationPackage.getAction());
+    public ReplicationQueue getQueue(ReplicationAgent agent,
+                                     ReplicationPackage replicationPackage) throws ReplicationQueueException {
+        return getQueue(agent, replicationPackage.getAction());
     }
 
-    public ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
+    public ReplicationQueue getQueue(ReplicationAgent agent, String queueName)
                     throws ReplicationQueueException {
-        String key = new StringBuilder(agent.getName()).append(queueName).toString();
+        String key = agent.getName() + queueName;
+
         if (log.isInfoEnabled()) {
             log.info("creating a queue with key {}", key);
         }
         ReplicationQueue queue = queueMap.get(key);
         if (queue == null) {
-            queue = createQueue(agent, queueName);
+            queue = getOrCreateQueue(agent, queueName);
             queueMap.put(key, queue);
             if (log.isInfoEnabled()) {
                 log.info("queue created {}", queue);
@@ -62,11 +63,11 @@ public abstract class AbstractReplicatio
         return queue;
     }
 
-    protected abstract ReplicationQueue createQueue(ReplicationAgent agent, String selector) throws ReplicationQueueException;
+    protected abstract ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String selector) throws ReplicationQueueException;
 
-    public ReplicationQueue getOrCreateDefaultQueue(ReplicationAgent agent)
+    public ReplicationQueue getDefaultQueue(ReplicationAgent agent)
                     throws ReplicationQueueException {
-        return getOrCreateQueue(agent, "");
+        return getQueue(agent, "");
     }
 
     public Collection<ReplicationQueue> getAllQueues() {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Mon Dec  9 13:37:48 2013
@@ -23,10 +23,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.PropertyOption;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
@@ -35,6 +31,9 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The error strategy for delivering packages to queues. Each agent manages a single queue for
@@ -57,7 +56,7 @@ public class ErrorAwareQueueDistribution
 
     @Property(name = "Stuck Queue Handling", options = {
             @PropertyOption(name = ERROR, value = "Error"),
-            @PropertyOption(name = "DROP", value = "Drop") })
+            @PropertyOption(name = "DROP", value = "Drop")})
     private static final String STUCK_HANDLING = "stuck.handling";
 
     private String stuckQueueHandling;
@@ -66,20 +65,20 @@ public class ErrorAwareQueueDistribution
 
     protected void activate(final ComponentContext ctx) {
         stuckQueueHandling = PropertiesUtil
-                        .toString(ctx.getProperties().get(STUCK_HANDLING), ERROR);
+                .toString(ctx.getProperties().get(STUCK_HANDLING), ERROR);
         attemptsThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(ATTEMPTS_THRESHOLD),
-                        100);
+                100);
     }
 
     public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
-                    ReplicationAgent agent, ReplicationQueueProvider queueProvider)
-                    throws ReplicationQueueException {
+                                         ReplicationAgent agent, ReplicationQueueProvider queueProvider)
+            throws ReplicationQueueException {
         try {
             if (log.isInfoEnabled()) {
-                log.info("using single queue distribution");
+                log.info("using error aware queue distribution");
             }
             ReplicationQueueItemState state = new ReplicationQueueItemState();
-            ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+            ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
             if (log.isInfoEnabled()) {
                 log.info("obtained queue {}", queue);
             }
@@ -94,33 +93,35 @@ public class ErrorAwareQueueDistribution
                         log.error("could not add the item to the queue {}", queue);
                     }
                     state.setItemState(ItemState.ERROR);
-                    state.setSuccessfull(false);
+                    state.setSuccessful(false);
                 }
                 return state;
             } else {
                 throw new ReplicationQueueException("could not get a queue for agent "
-                                + agent.getName());
+                        + agent.getName());
             }
         } finally {
             checkAndRemoveStuckItems(agent, queueProvider);
         }
     }
 
-    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
-                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
-        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        boolean added;
+        ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
         if (queue != null) {
-            queue.add(replicationPackage);
+            added = queue.add(replicationPackage);
         } else {
             throw new ReplicationQueueException("could not get a queue for agent "
-                            + agent.getName());
+                    + agent.getName());
         }
         checkAndRemoveStuckItems(agent, queueProvider);
+        return added;
     }
 
     private void checkAndRemoveStuckItems(ReplicationAgent agent,
-                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
-        ReplicationQueue defaultQueue = queueProvider.getOrCreateDefaultQueue(agent);
+                                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
         // get first item in the queue with its status
         ReplicationPackage firstItem = defaultQueue.getHead();
         if (firstItem != null) {
@@ -128,15 +129,20 @@ public class ErrorAwareQueueDistribution
             // if item is still in the queue after a max no. of attempts, move it to the error queue
             int attempts = status.getAttempts();
             if (log.isInfoEnabled()) {
-                log.info("attemps for item {}: {}", firstItem.getId(), attempts);
+                log.info("attempts for item {}: {}", firstItem.getId(), attempts);
             }
             if (attempts > attemptsThreshold) {
                 if (ERROR.equals(stuckQueueHandling)) {
                     if (log.isWarnEnabled()) {
                         log.warn("item moved to the error queue");
                     }
-                    ReplicationQueue errorQueue = queueProvider.getOrCreateQueue(agent, "-error");
-                    errorQueue.add(firstItem);
+                    ReplicationQueue errorQueue = queueProvider.getQueue(agent, "-error");
+                    if (!errorQueue.add(firstItem)) {
+                        if (log.isErrorEnabled()) {
+                            log.error("failed to move item {} the queue {}", firstItem, errorQueue);
+                        }
+                        throw new ReplicationQueueException("could not move an item to the error queue");
+                    }
                 }
                 if (log.isWarnEnabled()) {
                     log.warn("item dropped from the default queue");

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Mon Dec  9 13:37:48 2013
@@ -57,7 +57,7 @@ public class PriorityPathDistributionStr
     private String[] priorityPaths;
 
     @Activate
-    protected void activate(ComponentContext context) throws Exception {
+    protected void activate(ComponentContext context) {
         priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
     }
 
@@ -85,7 +85,7 @@ public class PriorityPathDistributionStr
                     log.error("could not add the item to the queue {}", queue);
                 }
                 state.setItemState(ItemState.ERROR);
-                state.setSuccessfull(false);
+                state.setSuccessful(false);
             }
             return state;
         } else {
@@ -121,21 +121,21 @@ public class PriorityPathDistributionStr
             if (log.isInfoEnabled()) {
                 log.info("using priority queue for path {}", pp);
             }
-            queue = queueProvider.getOrCreateQueue(agent, pp);
+            queue = queueProvider.getQueue(agent, pp);
         } else {
             if (log.isInfoEnabled()) {
                 log.info("using default queue");
             }
-            queue = queueProvider.getOrCreateDefaultQueue(agent);
+            queue = queueProvider.getDefaultQueue(agent);
         }
         return queue;
     }
 
-    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
-                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueue queue = getQueue(replicationPackage, agent, queueProvider);
         if (queue != null) {
-            queue.add(replicationPackage);
+            return queue.add(replicationPackage);
         } else {
             throw new ReplicationQueueException("could not get a queue for agent "
                             + agent.getName());

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Mon Dec  9 13:37:48 2013
@@ -35,7 +35,7 @@ import org.apache.sling.replication.seri
 
 /**
  * The default strategy for delivering packages to queues. Each agent just manages a single queue,
- * no failure / stuck handling where each pacakge is put regardless of anything.
+ * no failure / stuck handling where each package is put regardless of anything.
  */
 @Component(immediate = true)
 @Service(value = ReplicationQueueDistributionStrategy.class)
@@ -53,7 +53,7 @@ public class SingleQueueDistributionStra
             log.info("using single queue distribution");
         }
         ReplicationQueueItemState state = new ReplicationQueueItemState();
-        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+        ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
         if (log.isInfoEnabled()) {
             log.info("obtained queue {}", queue);
         }
@@ -68,7 +68,7 @@ public class SingleQueueDistributionStra
                     log.error("could not add the item to the queue {}", queue);
                 }
                 state.setItemState(ItemState.ERROR);
-                state.setSuccessfull(false);
+                state.setSuccessful(false);
             }
             return state;
         } else {
@@ -78,11 +78,11 @@ public class SingleQueueDistributionStra
 
     }
 
-    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
-                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
-        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
         if (queue != null) {
-            queue.add(replicationPackage);
+            return queue.add(replicationPackage);
         } else {
             throw new ReplicationQueueException("could not get a queue for agent "
                             + agent.getName());

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Mon Dec  9 13:37:48 2013
@@ -39,7 +39,7 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
- * a {@link ReplicationQueue} based on Sling Job Handling facilities
+ * a {@link ReplicationQueue} based on Sling Job Handling facilities
  */
 public class JobHandlingReplicationQueue implements ReplicationQueue {
 
@@ -47,11 +47,11 @@ public class JobHandlingReplicationQueue
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private String name;
+    private final String name;
 
-    private String topic;
+    private final String topic;
 
-    private JobManager jobManager;
+    private final JobManager jobManager;
 
     protected JobHandlingReplicationQueue(String name, String topic, JobManager jobManager) {
         this.name = name;
@@ -59,6 +59,10 @@ public class JobHandlingReplicationQueue
         this.jobManager = jobManager;
     }
 
+    public String getName() {
+        return name;
+    }
+
     public boolean add(ReplicationPackage replicationPackage) {
         boolean result = true;
         try {
@@ -117,8 +121,6 @@ public class JobHandlingReplicationQueue
         Job firstItem = getFirstItem();
         if (firstItem != null) {
             jobManager.removeJobById(firstItem.getId());
-        } else {
-
         }
     }
 
@@ -150,7 +152,9 @@ public class JobHandlingReplicationQueue
     }
 
     public boolean isEmpty() {
-        return jobManager.getQueue(name).getStatistics().getNumberOfJobs() == 0;
+        // TODO : fix this
+//        return jobManager.getQueue(name).getStatistics().getNumberOfJobs() == 0;
+        return false;
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Mon Dec  9 13:37:48 2013
@@ -54,13 +54,11 @@ public class JobHandlingReplicationQueue
     private ConfigurationAdmin configAdmin;
 
     @Override
-    protected ReplicationQueue createQueue(ReplicationAgent agent, String queueName)
+    protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
                     throws ReplicationQueueException {
         try {
-            String name = new StringBuilder(agent.getName()).append(queueName).toString();
-            String topic = new StringBuilder(
-                            JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC).append('/')
-                            .append(agent.getName()).append(queueName).toString();
+            String name = agent.getName() + queueName;
+            String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName() + queueName;
             if (jobManager.getQueue(name) == null) {
                 Configuration config = configAdmin.createFactoryConfiguration(
                                 QueueConfiguration.class.getName(), null);

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Mon Dec  9 13:37:48 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.ScheduledJobInfo;
@@ -55,8 +56,7 @@ public class JobHandlingUtils {
     public static Job getJob(ScheduledJobInfo info, JobManager jobManager, String topic) {
         String id = String.valueOf(info.getJobProperties().get(ID));
         Map<String, Object> jobProps = JobHandlingUtils.createIdPropertiesFromId(id);
-        Job job = jobManager.getJob(topic, jobProps);
-        return job;
+        return jobManager.getJob(topic, jobProps);
     }
 
     public static ReplicationPackage getPackage(ReplicationPackageBuilder packageBuilder,
@@ -111,9 +111,11 @@ public class JobHandlingUtils {
             }
 
             public InputStream getInputStream() throws IOException {
-                // TODO : eventually re-enable it once SLING-3140 gets released
+                // TODO : use this once SLING-3140 gets released
                 // return new ByteArrayInputStream((byte[]) job.getProperty(BIN));
-                return null;
+
+                // workaround to make void package work while we get SLING-3140 to be released
+                return IOUtils.toInputStream(String.valueOf(job.getProperty(ID)));
             }
 
             public String getId() {
@@ -132,7 +134,7 @@ public class JobHandlingUtils {
     }
 
     public static Map<String, Object> createFullPropertiesFromPackage(
-                    ReplicationPackage replicationPackage) throws IOException {
+                    ReplicationPackage replicationPackage) {
         Map<String, Object> properties = new HashMap<String, Object>();
         properties.put(ID, replicationPackage.getId());
         properties.put(PATHS, replicationPackage.getPaths());

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java Mon Dec  9 13:37:48 2013
@@ -19,21 +19,18 @@
 package org.apache.sling.replication.queue.impl.simple;
 
 import java.util.Arrays;
-
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.sling.replication.agent.AgentReplicationException;
 import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * a simple scheduled {@link SimpleReplicationQueue}s processor
@@ -80,7 +77,7 @@ public class ScheduledReplicationQueuePr
                     // }
                 }
             }
-        } catch (ReplicationQueueException e) {
+        } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("error while processing queue {}", e);
             }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Mon Dec  9 13:37:48 2013
@@ -41,23 +41,30 @@ import org.apache.sling.replication.seri
  */
 public class SimpleReplicationQueue implements ReplicationQueue {
 
-    private Logger log = LoggerFactory.getLogger(getClass());
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private ReplicationAgent agent;
+    private final ReplicationAgent agent;
 
-    private BlockingQueue<ReplicationPackage> queue;
+    private final String name;
 
-    private Map<ReplicationPackage, ReplicationQueueItemState> statusMap;
+    private final BlockingQueue<ReplicationPackage> queue;
 
-    public SimpleReplicationQueue(ReplicationAgent agent) {
+    private final Map<ReplicationPackage, ReplicationQueueItemState> statusMap;
+
+    public SimpleReplicationQueue(ReplicationAgent agent, String name) {
         if (log.isInfoEnabled()) {
             log.info("starting a simple queue for agent {}", agent.getName());
         }
         this.agent = agent;
+        this.name = name;
         this.queue = new LinkedBlockingQueue<ReplicationPackage>();
         this.statusMap = new WeakHashMap<ReplicationPackage, ReplicationQueueItemState>(10);
     }
 
+    public String getName() {
+        return name;
+    }
+
     public boolean add(ReplicationPackage replicationPackage) {
         ReplicationQueueItemState status = new ReplicationQueueItemState();
         boolean result = false;
@@ -65,7 +72,7 @@ public class SimpleReplicationQueue impl
             result = queue.offer(replicationPackage, 10, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.error("cannot add an item to the queue", e);
-            status.setSuccessfull(false);
+            status.setSuccessful(false);
         } finally {
             statusMap.put(replicationPackage, status);
         }
@@ -97,7 +104,7 @@ public class SimpleReplicationQueue impl
 
     public void removeHead() {
         ReplicationPackage element = queue.remove();
-        statusMap.get(element).setSuccessfull(true);
+        statusMap.get(element).setSuccessful(true);
     }
 
     public boolean isEmpty() {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Mon Dec  9 13:37:48 2013
@@ -40,9 +40,9 @@ public class SimpleReplicationQueueProvi
 
     public static final String NAME = "simple";
 
-    protected ReplicationQueue createQueue(ReplicationAgent agent, String selector)
+    protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String selector)
                     throws ReplicationQueueException {
-        return new SimpleReplicationQueue(agent);
+        return new SimpleReplicationQueue(agent, selector);
     }
 
     protected void deleteQueue(ReplicationQueue queue) throws ReplicationQueueException {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRule.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRule.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRule.java Mon Dec  9 13:37:48 2013
@@ -33,16 +33,25 @@ public interface ReplicationRule {
     String getSignature();
 
     /**
+     * checks if the given rule <code>String</code> matches this {@link ReplicationRule}'s signature
+     *
+     * @param ruleString a rule <code>String</code>
+     * @return <code>true</code> if the given rule <code>String</code> matches this rule's signature, <code>false</code> otherwise
+     */
+    boolean signatureMatches(String ruleString);
+
+    /**
      * apply this rule to a replication agent
      *
-     * @param {@link ReplicationAgent agent} the agent to apply the rule to
+     * @param ruleString the rule to apply to the agent
+     * @param agent      {@link ReplicationAgent agent} the agent to apply the rule to
      */
-    void apply(ReplicationAgent agent);
+    void apply(String ruleString, ReplicationAgent agent);
 
     /**
      * undo the application of this rule to the given agent
      *
      * @param agent the {@link ReplicationAgent agent} on which undoing this rule application
      */
-    void unapply(ReplicationAgent agent);
+    void undo(String ruleString, ReplicationAgent agent);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRuleEngine.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRuleEngine.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRuleEngine.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/ReplicationRuleEngine.java Mon Dec  9 13:37:48 2013
@@ -29,9 +29,18 @@ public interface ReplicationRuleEngine {
      * reads a <code>String</code>, transforms it into a {@link ReplicationRule} and applies it to the
      * given {@link ReplicationAgent}
      *
-     * @param ruleString the <code>String</code> to be read as a {@link ReplicationRule}
-     * @param agent      the {@link ReplicationAgent} to apply the rule to
+     * @param agent       the {@link ReplicationAgent} to apply the rule to
+     * @param ruleStrings the <code>String</code> array to be read as a {@link ReplicationRule}s
      */
-    void applyRule(String ruleString, ReplicationAgent agent);
+    void applyRules(ReplicationAgent agent, String... ruleStrings);
+
+    /**
+     * reads a <code>String</code>, transforms it into a {@link ReplicationRule} and un-applies it to the
+     * given {@link ReplicationAgent}
+     *
+     * @param agent       the {@link ReplicationAgent} to un-apply the rule to
+     * @param ruleStrings the <code>String</code> array to be read as a {@link ReplicationRule}s
+     */
+    void unapplyRules(ReplicationAgent agent, String... ruleStrings);
 
 }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ChainReplicateReplicationRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ChainReplicateReplicationRule.java?rev=1549567&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ChainReplicateReplicationRule.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ChainReplicateReplicationRule.java Mon Dec  9 13:37:48 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.rule.impl;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.event.ReplicationEvent;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.rule.ReplicationRule;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * a rule for triggering a chain replication upon replication package installation {@link ReplicationEventType.PACKAGE_INSTALLED}
+ */
+@Component(immediate = true)
+@Service(value = ReplicationRule.class)
+public class ChainReplicateReplicationRule implements ReplicationRule {
+
+    private static final String PREFIX = "chain on path:";
+    private static final String SIGNATURE = "chain on path: ${path}";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private BundleContext bundleContext;
+    private final Map<String, ServiceRegistration> registrations = new HashMap<String, ServiceRegistration>();
+
+    @Activate
+    protected void activate(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        for (Map.Entry<String, ServiceRegistration> entry : registrations.entrySet()) {
+            if (entry.getValue() != null) {
+                entry.getValue().unregister();
+            }
+        }
+        registrations.clear();
+    }
+
+    public String getSignature() {
+        return SIGNATURE;
+    }
+
+    public void apply(String ruleString, ReplicationAgent agent) {
+        if (signatureMatches(ruleString)) {
+            // register an event handler on replication package install (on a certain path) which triggers the chain replication of that same package
+            Dictionary<String, Object> properties = new Hashtable<String, Object>();
+            properties.put(EventConstants.EVENT_TOPIC, ReplicationEvent.getTopic(ReplicationEventType.PACKAGE_INSTALLED));
+            String path = ruleString.substring(ruleString.indexOf(':') + 1).trim();
+            if (log.isInfoEnabled()) {
+                log.info("agent {} will chain replication on path '{}'", agent.getName(), path);
+            }
+//            properties.put(EventConstants.EVENT_FILTER, "(path=" + path + "/*)");
+            if (bundleContext != null) {
+                ServiceRegistration triggerPathEventRegistration = bundleContext.registerService(EventHandler.class.getName(), new TriggerAgentEventListener(agent, path), properties);
+                registrations.put(agent.getName() + ruleString, triggerPathEventRegistration);
+            } else {
+                if (log.isErrorEnabled()) {
+                    log.error("cannot register trigger since bundle context is null");
+                }
+            }
+        } else {
+            if (log.isWarnEnabled()) {
+                log.warn("rule {} doesn't match signature: {}", ruleString, SIGNATURE);
+            }
+        }
+
+    }
+
+    public boolean signatureMatches(String ruleString) {
+        return ruleString.startsWith(PREFIX) && ruleString.substring(PREFIX.length() + 1).matches("\\s*(\\/\\w+)+");
+    }
+
+    public void undo(String ruleString, ReplicationAgent agent) {
+        if (signatureMatches(ruleString)) {
+            ServiceRegistration serviceRegistration = registrations.get(agent.getName() + ruleString);
+            if (serviceRegistration != null) {
+                serviceRegistration.unregister();
+            }
+        } else {
+            if (log.isWarnEnabled()) {
+                log.warn("rule {} doesn't match signature: {}", ruleString, SIGNATURE);
+            }
+        }
+    }
+
+
+    private class TriggerAgentEventListener implements EventHandler {
+
+        private final ReplicationAgent agent;
+        private final String path;
+
+        public TriggerAgentEventListener(ReplicationAgent agent, String path) {
+            this.agent = agent;
+            this.path = path;
+        }
+
+        public void handleEvent(Event event) {
+            Object actionProperty = event.getProperty("replication.action");
+            Object pathProperty = event.getProperty("replication.path");
+            if (actionProperty != null && pathProperty != null) {
+                String[] paths = (String[]) pathProperty;
+                for (String p : paths) {
+                    if (p.startsWith(path)) {
+                        if (log.isInfoEnabled()) {
+                            log.info("triggering chain replication from event {}", event);
+                        }
+                        ReplicationActionType action = ReplicationActionType.valueOf(String.valueOf(actionProperty));
+                        try {
+                            agent.send(new ReplicationRequest(System.currentTimeMillis(), action, paths));
+                        } catch (AgentReplicationException e) {
+                            if (log.isErrorEnabled()) {
+                                log.error("triggered replication resulted in an error {}", e);
+                            }
+                        }
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/DefaultReplicationRuleEngine.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/DefaultReplicationRuleEngine.java?rev=1549567&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/DefaultReplicationRuleEngine.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/DefaultReplicationRuleEngine.java Mon Dec  9 13:37:48 2013
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.rule.impl;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.rule.ReplicationRule;
+import org.apache.sling.replication.rule.ReplicationRuleEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * default implementation of {@link ReplicationRuleEngine}
+ */
+@Component
+@References({
+        @Reference(name = "replicationRule",
+                referenceInterface = ReplicationRule.class,
+                cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+                policy = ReferencePolicy.DYNAMIC,
+                bind = "bindReplicationRule",
+                unbind = "unbindReplicationRule")
+})
+@Service(value = ReplicationRuleEngine.class)
+public class DefaultReplicationRuleEngine implements ReplicationRuleEngine {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final Collection<ReplicationRule> replicationRules = new LinkedList<ReplicationRule>();
+
+    @Deactivate
+    protected void deactivate() {
+        replicationRules.clear();
+    }
+
+    protected void bindReplicationRule(final ReplicationRule replicationRule,
+                                       Map<String, Object> properties) {
+        synchronized (replicationRules) {
+            replicationRules.add(replicationRule);
+        }
+        log.debug("Registering replication rule {} ", replicationRule);
+    }
+
+    protected void unbindReplicationRule(final ReplicationRule replicationRule,
+                                         Map<String, Object> properties) {
+        synchronized (replicationRules) {
+            replicationRules.remove(replicationRule);
+        }
+        log.debug("Unregistering replication rule {} ", replicationRule);
+    }
+
+    public void applyRules(ReplicationAgent agent, String... ruleStrings) {
+        for (String ruleString : ruleStrings) {
+
+            for (ReplicationRule rule : replicationRules) {
+                if (rule.signatureMatches(ruleString)) {
+                    if (log.isInfoEnabled()) {
+                        log.info("applying rule {} with string {} to agent {}", new Object[]{rule, ruleString, agent});
+                    }
+                    rule.apply(ruleString, agent);
+                    break;
+                }
+            }
+
+        }
+    }
+
+    public void unapplyRules(ReplicationAgent agent, String... ruleStrings) {
+        for (String ruleString : ruleStrings) {
+
+            for (ReplicationRule rule : replicationRules) {
+                if (rule.signatureMatches(ruleString)) {
+                    if (log.isInfoEnabled()) {
+                        log.info("un-applying rule {} with string {} to agent {}", new Object[]{rule, ruleString, agent});
+                    }
+                    rule.undo(ruleString, agent);
+                    break;
+                }
+            }
+
+        }
+    }
+
+}

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java Mon Dec  9 13:37:48 2013
@@ -18,30 +18,133 @@
  */
 package org.apache.sling.replication.rule.impl;
 
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.replication.agent.AgentReplicationException;
 import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.rule.ReplicationRule;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * a rule for triggering a specific agent upon node / properties being changed under a certain path
  */
+@Component(immediate = true)
+@Service(value = ReplicationRule.class)
 public class TriggerPathReplicationRule implements ReplicationRule {
 
-    private final String path;
+    //    private static final Pattern p = Pattern.compile("(\\/\\w+)+\\/?");
+    private static final String PREFIX = "trigger on path:";
+    private static final String SIGNATURE = "trigger on path: ${path}";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private BundleContext bundleContext;
+    private final Map<String, ServiceRegistration> registrations = new HashMap<String, ServiceRegistration>();
+
+    @Activate
+    protected void activate(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
 
-    public TriggerPathReplicationRule(String path) {
-        this.path = path;
+    @Deactivate
+    protected void deactivate() {
+        for (Map.Entry<String, ServiceRegistration> entry : registrations.entrySet()) {
+            if (entry.getValue() != null) {
+                entry.getValue().unregister();
+            }
+        }
+        registrations.clear();
     }
 
     public String getSignature() {
-        return "trigger on path: ${path}";
+        return SIGNATURE;
+    }
+
+    public void apply(String ruleString, ReplicationAgent agent) {
+        if (signatureMatches(ruleString)) {
+            // register an event handler on path which triggers the agent on node / property changes / addition / removals
+            Dictionary<String, Object> properties = new Hashtable<String, Object>();
+            properties.put(EventConstants.EVENT_TOPIC, new String[]{SlingConstants.TOPIC_RESOURCE_ADDED,
+                    SlingConstants.TOPIC_RESOURCE_CHANGED, SlingConstants.TOPIC_RESOURCE_REMOVED});
+            String path = ruleString.substring(ruleString.indexOf(':') + 1).trim();
+            if (log.isInfoEnabled()) {
+                log.info("trigger agent {} on path '{}'", agent.getName(), path);
+            }
+            properties.put(EventConstants.EVENT_FILTER, "(path=" + path + "/*)");
+            if (bundleContext != null) {
+                ServiceRegistration triggerPathEventRegistration = bundleContext.registerService(EventHandler.class.getName(), new TriggerAgentEventListener(agent), properties);
+                registrations.put(agent.getName() + ruleString, triggerPathEventRegistration);
+            } else {
+                if (log.isErrorEnabled()) {
+                    log.error("cannot register trigger since bundle context is null");
+                }
+            }
+        } else {
+            if (log.isWarnEnabled()) {
+                log.warn("rule {} doesn't match signature: {}", ruleString, SIGNATURE);
+            }
+        }
+
     }
 
-    public void apply(ReplicationAgent agent) {
-        // TODO : register an event handler on path which triggers the agent on node / property changes
+    public boolean signatureMatches(String ruleString) {
+        return ruleString.startsWith(PREFIX) && ruleString.substring(PREFIX.length() + 1).matches("\\s*(\\/\\w+)+");
+    }
 
+    public void undo(String ruleString, ReplicationAgent agent) {
+        if (signatureMatches(ruleString)) {
+            ServiceRegistration serviceRegistration = registrations.get(agent.getName() + ruleString);
+            if (serviceRegistration != null) {
+                serviceRegistration.unregister();
+            }
+        }
+        else {
+            if (log.isWarnEnabled()) {
+                log.warn("rule {} doesn't match signature: {}", ruleString, SIGNATURE);
+            }
+        }
     }
 
-    public void unapply(ReplicationAgent agent) {
-        // TODO : unregister the event handler for the given agent on that path
+
+    private class TriggerAgentEventListener implements EventHandler {
+
+        private final ReplicationAgent agent;
+
+        public TriggerAgentEventListener(ReplicationAgent agent) {
+            this.agent = agent;
+        }
+
+        public void handleEvent(Event event) {
+            ReplicationActionType action = SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ? ReplicationActionType.DELETE : ReplicationActionType.ADD;
+            if (log.isInfoEnabled()) {
+                log.info("triggering replication from event {}", event);
+            }
+            Object eventProperty = event.getProperty("path");
+            if (eventProperty != null) {
+                String replicatingPath = String.valueOf(eventProperty);
+                try {
+                    agent.send(new ReplicationRequest(System.currentTimeMillis(), action, replicatingPath));
+                } catch (AgentReplicationException e) {
+                    if (log.isErrorEnabled()) {
+                        log.error("triggered replication resulted in an error {}", e);
+                    }
+                }
+            }
+        }
     }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java Mon Dec  9 13:37:48 2013
@@ -27,16 +27,41 @@ import java.io.Serializable;
  */
 public interface ReplicationPackage extends Serializable {
 
+    /**
+     * get package id
+     * @return the package id as a <code>String</code>
+     */
     String getId();
 
+    /**
+     * get the paths covered by this package
+     * @return an array of <code>String</code> paths
+     */
     String[] getPaths();
 
+    /**
+     * get the action this package is used for
+     * @return the action as a <code>String</code>
+     */
     String getAction();
 
+    /**
+     * get the type of package
+     * @return the package type as a <code>String</code>
+     */
     String getType();
 
+    /**
+     * get package stream
+     * @return an {@link InputStream}
+     * @throws IOException
+     */
     InputStream getInputStream() throws IOException;
 
+    /**
+     * get package stream length
+     * @return the package length as a <code>long</code>
+     */
     long getLength();
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java Mon Dec  9 13:37:48 2013
@@ -25,13 +25,34 @@ import org.apache.sling.replication.comm
 /**
  * A builder for {@link ReplicationPackage}s
  */
-// TODO : add context
 public interface ReplicationPackageBuilder {
-    ReplicationPackage createPackage(ReplicationRequest request)
-                    throws ReplicationPackageBuildingException;
+    /**
+     * creates a {@link ReplicationPackage} for a specific {@link ReplicationRequest}
+     *
+     * @param request the {@link ReplicationRequest} to create the package for
+     * @return a {@link ReplicationPackage}
+     * @throws ReplicationPackageBuildingException
+     *
+     */
+    ReplicationPackage createPackage(ReplicationRequest request) throws ReplicationPackageBuildingException;
 
-    ReplicationPackage readPackage(ReplicationRequest request, InputStream stream, boolean install)
-                    throws ReplicationPackageBuildingException;
+    /**
+     * reads a stream and tries to convert it to a {@link ReplicationPackage} this provider can read and install
+     *
+     * @param stream  the {@link InputStream} of the package to read
+     * @param install if <code>true</code> then if the package can be read from the stream then it will try also
+     *                to install it into the repository
+     * @return a {@link ReplicationPackage} if it can read it from the stream
+     * @throws ReplicationPackageReadingException
+     *          when the stream cannot be read as a {@link ReplicationPackage}
+     */
+    ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException;
 
+    /**
+     * get an already created (and saved into the repository) {@link ReplicationPackage} by its id
+     *
+     * @param id a <code>String</code> representing the unique identifier of an already created {@link ReplicationPackage}
+     * @return a {@link ReplicationPackage} if one with such an id exists, <code>null</code> otherwise
+     */
     ReplicationPackage getPackage(String id);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilderProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilderProvider.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilderProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilderProvider.java Mon Dec  9 13:37:48 2013
@@ -25,8 +25,19 @@ import java.util.Collection;
  */
 public interface ReplicationPackageBuilderProvider {
 
-    Collection<ReplicationPackageBuilder> getAvailableReplicationPacakageBuilders();
+    /**
+     * get all the available {@link ReplicationPackageBuilder}s in the system
+     *
+     * @return a {@link Collection} of {@link ReplicationPackageBuilder}
+     */
+    Collection<ReplicationPackageBuilder> getAvailableReplicationPackageBuilders();
 
-    ReplicationPackageBuilder getReplicationPacakageBuilder(String name);
+    /**
+     * get the {@link ReplicationPackageBuilder} with the specified name
+     *
+     * @param name the name of the {@link ReplicationPackageBuilder} as a <code>String</code>
+     * @return the {@link ReplicationPackageBuilder} with the given name, if it exists, <code>null</code> otherwise
+     */
+    ReplicationPackageBuilder getReplicationPackageBuilder(String name);
 
 }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageReadingException.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageReadingException.java?rev=1549567&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageReadingException.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageReadingException.java Mon Dec  9 13:37:48 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization;
+
+/**
+ * This represents an error happening while reading / installing a
+ * {@link org.apache.sling.replication.serialization.ReplicationPackage}
+ */
+@SuppressWarnings("serial")
+public class ReplicationPackageReadingException extends Exception {
+
+    public ReplicationPackageReadingException(String message) {
+        super(message);
+    }
+
+    public ReplicationPackageReadingException(Throwable t) {
+        super(t);
+    }
+}

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Mon Dec  9 13:37:48 2013
@@ -18,54 +18,108 @@
  */
 package org.apache.sling.replication.serialization.impl;
 
+import java.io.BufferedInputStream;
 import java.io.InputStream;
-
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
 import org.apache.sling.replication.communication.ReplicationActionType;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * base abstract implementation of a {@link ReplicationPackageBuilder}
+ */
 public abstract class AbstractReplicationPackageBuilder implements ReplicationPackageBuilder {
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     public ReplicationPackage createPackage(ReplicationRequest request)
-                    throws ReplicationPackageBuildingException {
-        ReplicationPackage replicationPackage = null;
-        if (ReplicationActionType.ACTIVATE.equals(request.getAction())) {
-            replicationPackage = createPackageForActivation(request);
-        } else if (ReplicationActionType.DEACTIVATE.equals(request.getAction())) {
-            replicationPackage = createPackageForDeactivation(request);
+            throws ReplicationPackageBuildingException {
+        ReplicationPackage replicationPackage;
+        if (ReplicationActionType.ADD.equals(request.getAction())) {
+            replicationPackage = createPackageForAdd(request);
+        } else if (ReplicationActionType.DELETE.equals(request.getAction())) {
+            replicationPackage = new VoidReplicationPackage(request, getName());
         } else {
             throw new ReplicationPackageBuildingException("unknown action type "
-                            + request.getAction());
+                    + request.getAction());
         }
         return replicationPackage;
     }
 
-    protected abstract ReplicationPackage createPackageForDeactivation(
-                    final ReplicationRequest request);
+    protected abstract ReplicationPackage createPackageForAdd(ReplicationRequest request)
+            throws ReplicationPackageBuildingException;
 
-    protected abstract ReplicationPackage createPackageForActivation(ReplicationRequest request)
-                    throws ReplicationPackageBuildingException;
-
-    public ReplicationPackage readPackage(ReplicationRequest request, InputStream stream,
-                    boolean install) throws ReplicationPackageBuildingException {
+    public ReplicationPackage readPackage(InputStream stream,
+                                          boolean install) throws ReplicationPackageReadingException {
         ReplicationPackage replicationPackage = null;
-        if (ReplicationActionType.ACTIVATE.equals(request.getAction())) {
-            replicationPackage = readPackageForActivation(request, stream, install);
-        } else if (ReplicationActionType.DEACTIVATE.equals(request.getAction())) {
-            replicationPackage = readPackageForDeactivation(request, stream, install);
-        } else {
-            throw new ReplicationPackageBuildingException("unknown action type "
-                            + request.getAction());
+        if (!stream.markSupported()) {
+            stream = new BufferedInputStream(stream);
+        }
+        try {
+            stream.mark(6);
+            byte[] buffer = new byte[6];
+            int bytesRead = stream.read(buffer, 0, 6);
+            stream.reset();
+            String s = new String(buffer);
+            if (log.isInfoEnabled()) {
+                log.info("read {} bytes as {}", bytesRead, s);
+            }
+            if (bytesRead > 0 && buffer[0] > 0 && s.startsWith("DEL")) {
+                replicationPackage = readPackageForDelete(stream);
+            }
+        } catch (Exception e) {
+            if (log.isWarnEnabled()) {
+                log.warn("{}", e);
+            }
+        }
+        stream.mark(-1);
+        if (replicationPackage == null) {
+            replicationPackage = readPackageForAdd(stream, install);
         }
         return replicationPackage;
     }
 
-    protected abstract ReplicationPackage readPackageForDeactivation(ReplicationRequest request,
-                    InputStream stream, boolean install) throws ReplicationPackageBuildingException;
+    private ReplicationPackage readPackageForDelete(InputStream stream) throws ReplicationPackageReadingException {
+        ReplicationPackage replicationPackage = null;
+        Session session = null;
+        try {
+            VoidReplicationPackage voidReplicationPackage = VoidReplicationPackage.fromStream(stream);
+            if (voidReplicationPackage != null) {
+                session = getSession();
+                if (session != null) {
+                    for (String path : voidReplicationPackage.getPaths()) {
+                        if (session.itemExists(path)) {
+                            session.removeItem(path);
+                        }
+                    }
+                    session.save();
+                    ReplicationRequest request = new ReplicationRequest(System.currentTimeMillis(),
+                            ReplicationActionType.DELETE, voidReplicationPackage.getPaths());
+                    replicationPackage = new VoidReplicationPackage(request, getName());
+                }
+            }
+            return replicationPackage;
+        } catch (Exception e) {
+            throw new ReplicationPackageReadingException(e);
+        } finally {
+            if (session != null) {
+                session.logout();
+            }
+        }
+
+    }
+
+    protected abstract String getName();
+
+    protected abstract Session getSession() throws RepositoryException;
 
-    protected abstract ReplicationPackage readPackageForActivation(ReplicationRequest request,
-                    InputStream stream, boolean install) throws ReplicationPackageBuildingException;
+    protected abstract ReplicationPackage readPackageForAdd(InputStream stream, boolean install)
+            throws ReplicationPackageReadingException;
 
 }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java?rev=1549567&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java Mon Dec  9 13:37:48 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+
+/**
+ * Default implementation of {@link org.apache.sling.replication.agent.ReplicationAgentsManager}
+ */
+@Component
+@References({ 
+    @Reference(name = "replicationPackageBuilder", 
+               referenceInterface = ReplicationPackageBuilder.class, 
+               cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, 
+               policy = ReferencePolicy.DYNAMIC,
+               bind = "bindReplicationPackageBuilder", 
+               unbind = "unbindReplicationPackageBuilder")
+    })
+@Service(value = ReplicationPackageBuilderProvider.class)
+public class DefaultReplicationPackageBuilderProvider implements ReplicationPackageBuilderProvider {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final Map<String, ReplicationPackageBuilder> replicationPackageBuilders = new HashMap<String, ReplicationPackageBuilder>();
+
+    @Deactivate
+    protected void deactivate() {
+        replicationPackageBuilders.clear();
+    }
+
+    protected void bindReplicationPackageBuilder(
+                    final ReplicationPackageBuilder replicationPackageBuilder,
+                    Map<String, Object> properties) {
+        synchronized (replicationPackageBuilders) {
+            replicationPackageBuilders.put(String.valueOf(properties.get("name")),
+                            replicationPackageBuilder);
+        }
+        log.debug("Registering Replication Package Builder {} ", replicationPackageBuilder);
+    }
+
+    protected void unbindReplicationPackageBuilder(
+                    final ReplicationPackageBuilder replicationPackageBuilder,
+                    Map<String, Object> properties) {
+        synchronized (replicationPackageBuilders) {
+            replicationPackageBuilders.remove(String.valueOf("name"));
+        }
+        log.debug("Unregistering Replication PackageBuilder {} ", replicationPackageBuilder);
+    }
+
+    public Collection<ReplicationPackageBuilder> getAvailableReplicationPackageBuilders() {
+        return replicationPackageBuilders.values();
+    }
+
+    public ReplicationPackageBuilder getReplicationPackageBuilder(String name) {
+        return replicationPackageBuilders.get(name);
+    }
+
+}

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/ReplicationPackageAdapterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/ReplicationPackageAdapterFactory.java?rev=1549567&r1=1549566&r2=1549567&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/ReplicationPackageAdapterFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/ReplicationPackageAdapterFactory.java Mon Dec  9 13:37:48 2013
@@ -26,17 +26,13 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.SlingHttpServletRequest;
 import org.apache.sling.api.adapter.AdapterFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.replication.communication.ReplicationActionType;
-import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link AdapterFactory} for
@@ -47,9 +43,10 @@ import org.apache.sling.replication.seri
 @Properties({
         @Property(name = "adaptables", value = { "org.apache.sling.api.SlingHttpServletRequest" }),
         @Property(name = "adapters", value = { "org.apache.sling.replication.serialization.ReplicationPackage" }) })
+@Deprecated
 public class ReplicationPackageAdapterFactory implements AdapterFactory {
 
-    private Logger log = LoggerFactory.getLogger(getClass());
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference
     private ReplicationPackageBuilderProvider packageBuilderProvider;
@@ -64,20 +61,14 @@ public class ReplicationPackageAdapterFa
             if (adaptable instanceof SlingHttpServletRequest) {
                 SlingHttpServletRequest request = (SlingHttpServletRequest) adaptable;
                 String name = request.getHeader("Type");
-                ReplicationPackageBuilder replicationPacakageBuilder = packageBuilderProvider
-                                .getReplicationPacakageBuilder(name);
+                ReplicationPackageBuilder replicationPackageBuilder = packageBuilderProvider
+                                .getReplicationPackageBuilder(name);
                 if (log.isInfoEnabled()) {
-                    log.info("using {} package builder", replicationPacakageBuilder);
+                    log.info("using {} package builder", replicationPackageBuilder);
                 }
-                if (replicationPacakageBuilder != null) {
+                if (replicationPackageBuilder != null) {
                     InputStream stream = request.getInputStream();
-                    ReplicationActionType action = ReplicationActionType.fromName(request
-                                    .getHeader("Action"));
-                    String[] paths = Text.unescape(request.getHeader("Path")).replace("[", "").replace("]", "").split(", ");
-                    if (log.isInfoEnabled()) {
-                        log.info("action {} on paths {}", action, Arrays.toString(paths));
-                    }
-                    pkg = replicationPacakageBuilder.readPackage(new ReplicationRequest(System.currentTimeMillis(), action, paths), stream, true);
+                    pkg = replicationPackageBuilder.readPackage(stream, true);
                     if (pkg != null) {
                         if (log.isInfoEnabled()) {
                             log.info("package {} created", Arrays.toString(pkg.getPaths()));

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1549567&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Mon Dec  9 13:37:48 2013
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.vault.util.Text;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * A void {@link ReplicationPackage} is used for deletion of certain paths on the target instance
+ */
+public class VoidReplicationPackage implements ReplicationPackage {
+
+    private final String type;
+
+    private final String[] paths;
+
+    private final String id;
+
+    public VoidReplicationPackage(ReplicationRequest request, String type) {
+        this.type = type;
+        this.paths = request.getPaths();
+        this.id = ReplicationActionType.DELETE.toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime();
+    }
+
+    public static VoidReplicationPackage fromStream(InputStream stream) throws IOException {
+        VoidReplicationPackage replicationPackage = null;
+        String streamString = IOUtils.toString(stream);
+        int beginIndex = streamString.indexOf(':');
+        int endIndex = streamString.lastIndexOf(':');
+        if (beginIndex >= 0 && endIndex > beginIndex && streamString.startsWith(ReplicationActionType.DELETE.toString())) {
+            String pathsArrayString = Text.unescape(streamString.substring(beginIndex + 1, endIndex - 1));
+            String[] paths = pathsArrayString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
+            ReplicationRequest request = new ReplicationRequest(Long.valueOf(streamString.substring(streamString.lastIndexOf(':') + 1)),
+                    ReplicationActionType.DELETE, paths);
+            replicationPackage = new VoidReplicationPackage(request, "VOID");
+        }
+        return replicationPackage;
+    }
+
+
+    private static final long serialVersionUID = 1L;
+
+    public String getType() {
+        return type;
+    }
+
+    public String[] getPaths() {
+        return paths;
+    }
+
+    public long getLength() {
+        return id.getBytes().length;
+    }
+
+    public InputStream getInputStream() throws IOException {
+        return new ByteArrayInputStream(id.getBytes());
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getAction() {
+        return ReplicationActionType.DELETE.toString();
+    }
+
+}



Mime
View raw message