sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1615577 - in /sling/trunk/contrib/extensions/replication: core/src/main/java/org/apache/sling/replication/agent/impl/ core/src/main/java/org/apache/sling/replication/queue/ core/src/main/java/org/apache/sling/replication/queue/impl/jobhand...
Date Mon, 04 Aug 2014 14:36:17 GMT
Author: tommaso
Date: Mon Aug  4 14:36:16 2014
New Revision: 1615577

URL: http://svn.apache.org/r1615577
Log:
SLING-3816 - restored previous version

Removed:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/services/
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReverseReplicationTest.java
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Mon Aug  4 14:36:16 2014
@@ -41,8 +41,10 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
 import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
 import org.apache.sling.replication.rule.ReplicationRuleEngine;
-import org.apache.sling.replication.serialization.ReplicationPackageExporter;
-import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.apache.sling.replication.transport.impl.NopTransportHandler;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
@@ -67,10 +69,16 @@ public class ReplicationAgentServiceFact
 
     static final String SERVICE_PID = "org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory";
 
+    private static final String TRANSPORT = ReplicationAgentConfiguration.TRANSPORT;
+
     private static final String QUEUEPROVIDER = ReplicationAgentConfiguration.QUEUEPROVIDER;
 
+    private static final String PACKAGING = ReplicationAgentConfiguration.PACKAGING;
+
     private static final String QUEUE_DISTRIBUTION = ReplicationAgentConfiguration.QUEUE_DISTRIBUTION;
 
+    private static final String DEFAULT_PACKAGING = "(name="
+            + FileVaultReplicationPackageBuilder.NAME + ")";
 
     private static final String DEFAULT_QUEUEPROVIDER = "(name="
             + JobHandlingReplicationQueueProvider.NAME + ")";
@@ -78,6 +86,9 @@ public class ReplicationAgentServiceFact
     private static final String DEFAULT_DISTRIBUTION = "(name="
             + SingleQueueDistributionStrategy.NAME + ")";
 
+    private static final String DEFAULT_TRANSPORT = "(name="
+            + NopTransportHandler.NAME + ")";
+
     @Property(boolValue = true, label = "Enabled")
     private static final String ENABLED = ReplicationAgentConfiguration.ENABLED;
 
@@ -90,13 +101,13 @@ public class ReplicationAgentServiceFact
     @Property(boolValue = true, label = "Replicate using aggregated paths")
     private static final String USE_AGGREGATE_PATHS = ReplicationAgentConfiguration.USE_AGGREGATE_PATHS;
 
-    @Property(label = "Target ReplicationPackageExporter", name = "ReplicationPackageExporter.target", value = "(name=vlt)")
-    @Reference(name = "ReplicationPackageExporter", target = "(name=vlt)", policy = ReferencePolicy.DYNAMIC)
-    private ReplicationPackageExporter packageExporter;
-
-    @Property(label = "Target ReplicationPackageImporter", name = "ReplicationPackageImporter.target", value = "(name=default)")
-    @Reference(name = "ReplicationPackageImporter", target = "(name=default)", policy = ReferencePolicy.DYNAMIC)
-    private ReplicationPackageImporter packageImporter;
+    @Property(label = "Target TransportHandler", name = TRANSPORT, value = "(name=" + NopTransportHandler.NAME + ")")
+    @Reference(name = "TransportHandler", target = "(name=" + NopTransportHandler.NAME + ")", policy = ReferencePolicy.DYNAMIC)
+    private volatile TransportHandler transportHandler;
+
+    @Property(label = "Target ReplicationPackageBuilder", name = PACKAGING, value = DEFAULT_PACKAGING)
+    @Reference(name = "ReplicationPackageBuilder", target = DEFAULT_PACKAGING, policy = ReferencePolicy.DYNAMIC)
+    private volatile ReplicationPackageBuilder packageBuilder;
 
     @Property(label = "Target ReplicationQueueProvider", name = QUEUEPROVIDER, value = DEFAULT_QUEUEPROVIDER)
     @Reference(name = "ReplicationQueueProvider", target = DEFAULT_QUEUEPROVIDER, policy = ReferencePolicy.DYNAMIC)
@@ -138,6 +149,11 @@ public class ReplicationAgentServiceFact
                     .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
             props.put(NAME, name);
 
+            String transport = PropertiesUtil.toString(config.get(TRANSPORT), "");
+            props.put(TRANSPORT, transport);
+
+            String packaging = PropertiesUtil.toString(config.get(PACKAGING), "");
+            props.put(PACKAGING, packaging);
 
             String queue = PropertiesUtil.toString(config.get(QUEUEPROVIDER), "");
             props.put(QUEUEPROVIDER, queue);
@@ -152,22 +168,19 @@ public class ReplicationAgentServiceFact
             boolean useAggregatePaths = PropertiesUtil.toBoolean(config.get(USE_AGGREGATE_PATHS), true);
             props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
 
-            boolean isPassive = PropertiesUtil.toBoolean(config.get("isPassive"), false);
-            props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
-
             // check configuration is valid
-            if (name == null || packageExporter == null || packageImporter == null || queueProvider == null || queueDistributionStrategy == null) {
+            if (name == null || packageBuilder == null || queueProvider == null || queueDistributionStrategy == null) {
                 throw new AgentConfigurationException("configuration for this agent is not valid");
             }
 
 
             if (log.isInfoEnabled()) {
                 log.info("bound services for {} :  {} - {} - {} - {} - {} - {}", new Object[]{name,
-                        packageImporter, packageExporter, queueProvider, queueDistributionStrategy});
+                        transportHandler, packageBuilder, queueProvider, queueDistributionStrategy});
             }
 
-            ReplicationAgent agent = new SimpleReplicationAgent(name, rules, useAggregatePaths, isPassive,
-                    packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, replicationRuleEngine);
+            ReplicationAgent agent = new SimpleReplicationAgent(name, rules, useAggregatePaths,
+                    transportHandler, packageBuilder, queueProvider, queueDistributionStrategy, replicationEventFactory, replicationRuleEngine);
 
 
             // only enable if instance runmodes match configured ones

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Mon Aug  4 14:36:16 2014
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.replication.agent.impl;
 
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.List;
@@ -37,10 +39,12 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.rule.ReplicationRuleEngine;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
-import org.apache.sling.replication.serialization.ReplicationPackageExporter;
-import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+import org.apache.sling.replication.transport.ReplicationTransportException;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.apache.sling.replication.transport.impl.NopTransportHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,13 +53,15 @@ import org.slf4j.LoggerFactory;
  */
 public class SimpleReplicationAgent implements ReplicationAgent {
 
+    private final static String RESPONSE_QUEUE = "response";
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final ReplicationPackageBuilder packageBuilder;
+
     private final ReplicationQueueProvider queueProvider;
 
-    private final boolean passive;
-    private final ReplicationPackageImporter replicationPackageImporter;
-    private final ReplicationPackageExporter replicationPackageExporter;
+    private final TransportHandler transportHandler;
 
     private final ReplicationQueueDistributionStrategy queueDistributionStrategy;
 
@@ -71,17 +77,15 @@ public class SimpleReplicationAgent impl
 
     public SimpleReplicationAgent(String name, String[] rules,
                                   boolean useAggregatePaths,
-                                  boolean passive,
-                                  ReplicationPackageImporter replicationPackageImporter,
-                                  ReplicationPackageExporter replicationPackageExporter,
+                                  TransportHandler transportHandler,
+                                  ReplicationPackageBuilder packageBuilder,
                                   ReplicationQueueProvider queueProvider,
                                   ReplicationQueueDistributionStrategy queueDistributionHandler,
                                   ReplicationEventFactory replicationEventFactory, ReplicationRuleEngine ruleEngine) {
         this.name = name;
         this.rules = rules;
-        this.passive = passive;
-        this.replicationPackageImporter = replicationPackageImporter;
-        this.replicationPackageExporter = replicationPackageExporter;
+        this.transportHandler = transportHandler;
+        this.packageBuilder = packageBuilder;
         this.queueProvider = queueProvider;
         this.queueDistributionStrategy = queueDistributionHandler;
         this.useAggregatePaths = useAggregatePaths;
@@ -106,7 +110,7 @@ public class SimpleReplicationAgent impl
     }
 
     public boolean isPassive() {
-        return passive;
+        return transportHandler == null || transportHandler instanceof NopTransportHandler; // TODO : improve this
     }
 
 
@@ -114,7 +118,7 @@ public class SimpleReplicationAgent impl
         // create package from request
         ReplicationPackage replicationPackage;
         try {
-            replicationPackage = replicationPackageExporter.exportPackage(replicationRequest);
+            replicationPackage = packageBuilder.createPackage(replicationRequest);
         } catch (ReplicationPackageBuildingException e) {
             throw new AgentReplicationException(e);
         }
@@ -212,7 +216,7 @@ public class SimpleReplicationAgent impl
             ReplicationQueueItem info = queue.getHead();
             if (info != null) {
                 queue.removeHead();
-                replicationPackage = replicationPackageExporter.exportPackageById(info.getId());
+                replicationPackage = packageBuilder.getPackage(info.getId());
             }
             return replicationPackage;
         } else {
@@ -244,6 +248,7 @@ public class SimpleReplicationAgent impl
 
         if (!isPassive()) {
             queueProvider.enableQueueProcessing(getName(), new PackageQueueProcessor());
+            transportHandler.enableProcessing(getName(), new ResponseProcessor());
         }
     }
 
@@ -255,14 +260,15 @@ public class SimpleReplicationAgent impl
 
         if (!isPassive()) {
             queueProvider.disableQueueProcessing(getName());
+            transportHandler.disableProcessing(getName());
         }
     }
 
-    private boolean processTransportQueue(ReplicationQueueItem queueItem) {
-        log.debug("reading package from id {}", queueItem.getId());
+    private boolean processResponseQueue(ReplicationQueueItem queueItem) {
+        InputStream stream = new ByteArrayInputStream(queueItem.getBytes());
+        log.debug("reading package from stream {}", stream);
         try {
-            ReplicationPackage replicationPackage = replicationPackageExporter.exportPackageById(queueItem.getId());
-            replicationPackageImporter.importPackage(replicationPackage);
+            ReplicationPackage replicationPackage = packageBuilder.readPackage(stream, true);
             replicationPackage.delete();
             return true;
         } catch (ReplicationPackageReadingException e) {
@@ -270,11 +276,46 @@ public class SimpleReplicationAgent impl
         }
     }
 
+    private boolean processTransportQueue(ReplicationQueueItem queueItem) {
+        try {
+            ReplicationPackage replicationPackage = packageBuilder.getPackage(queueItem.getId());
+            if (replicationPackage == null) {
+                return false;
+            }
+            if (transportHandler != null) {
+                transportHandler.transport(getName(), replicationPackage);
+                replicationPackage.delete();
+                return true;
+            } else {
+                log.info("agent {} processing skipped", name);
+                return false;
+            }
+        } catch (ReplicationTransportException e) {
+            log.error("transport error", e);
+            return false;
+        }
+    }
+
     class PackageQueueProcessor implements ReplicationQueueProcessor {
         public boolean process(String queueName, ReplicationQueueItem packageInfo) {
             log.info("running package queue processor");
+            if (RESPONSE_QUEUE.equalsIgnoreCase(queueName)) {
+                return processResponseQueue(packageInfo);
+            } else {
+                return processTransportQueue(packageInfo);
+            }
+        }
+    }
 
-            return processTransportQueue(packageInfo);
+    class ResponseProcessor implements ReplicationQueueProcessor {
+        public boolean process(String queueName, ReplicationQueueItem queueItem) {
+            log.info("running response processor");
+            try {
+                return getQueue(RESPONSE_QUEUE).add(queueItem);
+            } catch (Exception e) {
+                return false;
+            }
         }
     }
+
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java Mon Aug  4 14:36:16 2014
@@ -31,12 +31,22 @@ public class ReplicationQueueItem {
 
     private final String type;
 
+    private final byte[] bytes;
 
-    public ReplicationQueueItem(String id, String[] paths, String action, String type) {
+    private ReplicationQueueItem(String id, String[] paths, String action, String type, byte[] bytes) {
         this.id = id;
         this.paths = paths;
         this.action = action;
         this.type = type;
+        this.bytes = bytes;
+    }
+
+    public ReplicationQueueItem(String id, String[] paths, String action, String type) {
+        this(id, paths, action, type, null);
+    }
+
+    public ReplicationQueueItem(String[] paths, String action, String type, byte[] bytes) {
+        this(null, paths, action, type, bytes);
     }
 
     public String getId() {
@@ -54,4 +64,12 @@ public class ReplicationQueueItem {
     public String getType() {
         return type;
     }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    public boolean isTransient(){
+        return id == null;
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Mon Aug  4 14:36:16 2014
@@ -38,22 +38,37 @@ public class JobHandlingUtils {
 
     protected static final String ACTION = "replication.package.action";
 
+    protected static final String BYTES = "replication.package.bytes";
+
+
     public static ReplicationQueueItem getPackage(final Job job) {
-        return new ReplicationQueueItem((String) job.getProperty(ID),
-                (String[]) job.getProperty(PATHS),
-                String.valueOf(job.getProperty(ACTION)),
-                String.valueOf(job.getProperty(TYPE)));
+        String id = (String) job.getProperty(ID);
+        if (id != null) {
+            return new ReplicationQueueItem((String) job.getProperty(ID),
+                    (String[]) job.getProperty(PATHS),
+                    String.valueOf(job.getProperty(ACTION)),
+                    String.valueOf(job.getProperty(TYPE)));
+        } else {
+            return new ReplicationQueueItem((String[]) job.getProperty(PATHS),
+                    String.valueOf(job.getProperty(ACTION)),
+                    String.valueOf(job.getProperty(TYPE)),
+                    unBox((Byte[]) job.getProperty(BYTES)));
+        }
     }
 
     public static Map<String, Object> createFullPropertiesFromPackage(
             ReplicationQueueItem replicationPackage) {
         Map<String, Object> properties = new HashMap<String, Object>();
 
-        properties.put(ID, replicationPackage.getId());
+        if (replicationPackage.getId() != null)
+            properties.put(ID, replicationPackage.getId());
         properties.put(PATHS, replicationPackage.getPaths());
         properties.put(ACTION, replicationPackage.getAction());
         properties.put(TYPE, replicationPackage.getType());
 
+        if (replicationPackage.getBytes() != null)
+            properties.put(BYTES, box(replicationPackage.getBytes()));
+
         return properties;
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java Mon Aug  4 14:36:16 2014
@@ -20,16 +20,13 @@
 package org.apache.sling.replication.resources;
 
 public class ReplicationConstants {
-    public static final String SUFFIX_RESOURCE_LIST = "/list";
-    public static final String SUFFIX_AGENT_QUEUE = "/queue";
-    public static final String SUFFIX_AGENT_QUEUE_EVENT = "/queue/event";
-
     public static final String  AGENT_RESOURCE_TYPE = "replication/agent";
-    public static final String  AGENT_QUEUE_RESOURCE_TYPE = AGENT_RESOURCE_TYPE + SUFFIX_AGENT_QUEUE;
-    public static final String  AGENT_QUEUE_EVENT_RESOURCE_TYPE = AGENT_QUEUE_RESOURCE_TYPE + SUFFIX_AGENT_QUEUE_EVENT;
-    public static final String  AGENT_ROOT_RESOURCE_TYPE = AGENT_RESOURCE_TYPE + SUFFIX_RESOURCE_LIST;
-
+    public static final String  AGENT_QUEUE_RESOURCE_TYPE = "replication/agent/queue";
+    public static final String  AGENT_QUEUE_EVENT_RESOURCE_TYPE = "replication/agent/queue";
+    public static final String  AGENT_ROOT_RESOURCE_TYPE = "replication/agents";
+    public static final String  IMPORTER_ROOT_RESOURCE_TYPE = "replication/importers";
     public static final String  IMPORTER_RESOURCE_TYPE = "replication/importer";
-    public static final String  EXPORTER_RESOURCE_TYPE = "replication/exporter";
 
+    public static final String SUFFIX_AGENT_QUEUE = "/queue";
+    public static final String SUFFIX_AGENT_QUEUE_EVENT = "/queue/event";
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java Mon Aug  4 14:36:16 2014
@@ -36,6 +36,7 @@ public abstract class AbstractReadableRe
     protected static final String ADAPTABLE_PROPERTY_NAME = "adaptable";
 
     private static final String MAIN_RESOURCE_PREFIX = ".";
+    private static final String ROOT_RESOURCE_PREFIX = "..";
 
     private final String resourceRoot;
     private final Map<String, Map<String, String>> additionalResourcePropertiesMap = new HashMap<String, Map<String, String>>();
@@ -45,11 +46,12 @@ public abstract class AbstractReadableRe
 
         this.resourceRoot = resourceRoot;
 
+        additionalResourcePropertiesMap.put(ROOT_RESOURCE_PREFIX, new HashMap<String, String>());
         additionalResourcePropertiesMap.put(MAIN_RESOURCE_PREFIX, new HashMap<String, String>());
         for (Map.Entry<String, String> entry : additionalResourceProperties.entrySet()) {
             String resourceName = MAIN_RESOURCE_PREFIX;
             String propertyName = entry.getKey();
-            int idx = propertyName.indexOf("/");
+            int idx =propertyName.indexOf("/");
             if (idx >=0) {
                 resourceName = propertyName.substring(0, idx);
                 propertyName = propertyName.substring(idx+1);
@@ -91,10 +93,8 @@ public abstract class AbstractReadableRe
             if (properties != null) {
                 Object adaptable = properties.remove(ADAPTABLE_PROPERTY_NAME);
 
-                Map<String, String> additionalProperties = additionalResourcePropertiesMap.get(MAIN_RESOURCE_PREFIX);
-                if (!properties.containsKey("sling:resourceType") && additionalProperties.containsKey("sling:resourceType")) {
-                    properties.put("sling:resourceType", additionalProperties.get("sling:resourceType") +"/list");
-                }
+                Map<String, String > additionalProperties = additionalResourcePropertiesMap.get(ROOT_RESOURCE_PREFIX);
+                properties.putAll(additionalProperties);
 
                 resource = new SimpleReadableResource(resourceResolver, pathInfo.getResourcePath(), properties, adaptable);
             }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java Mon Aug  4 14:36:16 2014
@@ -40,11 +40,13 @@ public interface ReplicationPackageBuild
      * reads a stream and tries to convert it to a {@link ReplicationPackage} this provider can read and install
      *
      * @param stream  the {@link InputStream} of the package to read
+     * @param install if <code>true</code> then if the package can be read from the stream then it will try also
+     *                to install it into the repository
      * @return a {@link ReplicationPackage} if it can read it from the stream
      * @throws ReplicationPackageReadingException
      *          when the stream cannot be read as a {@link ReplicationPackage}
      */
-    ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException;
+    ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException;
 
     /**
      * get an already created (and saved into the repository) {@link ReplicationPackage} by its id
@@ -54,12 +56,4 @@ public interface ReplicationPackageBuild
      */
     ReplicationPackage getPackage(String id);
 
-    /**
-     * Installs the given replicationPackage into the repository
-     * @param replicationPackage
-     * @return
-     * @throws ReplicationPackageReadingException
-     */
-    boolean installPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException;
-
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java Mon Aug  4 14:36:16 2014
@@ -24,21 +24,14 @@ import java.io.InputStream;
  * A {@link org.apache.sling.replication.serialization.ReplicationPackage} importer
  */
 public interface ReplicationPackageImporter {
-    /**
-     * Imports the given replication package
-     * @param replicationPackage - the package to be imported
-     * @return
-     */
-    boolean importPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException;
 
     /**
-     * reads a stream and tries to convert it to a {@link ReplicationPackage} this provider can read and install
+     * Synchronously import the stream of a {@link org.apache.sling.replication.serialization.ReplicationPackage}
      *
-     * @param stream  the {@link InputStream} of the package to read
-     * @return a {@link ReplicationPackage} if it can read it from the stream
-     * @throws ReplicationPackageReadingException
-     *          when the stream cannot be read as a {@link ReplicationPackage}
+     * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
+     * @param type   the <code>String</code> representing the ({@link ReplicationPackage#getType() type} of the given package
+     * @return <code>true</code> if successfully imported, <code>false</code> otherwise
      */
-    ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException;
+    boolean importStream(InputStream stream, String type);
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Mon Aug  4 14:36:16 2014
@@ -60,7 +60,8 @@ public abstract class AbstractReplicatio
     protected abstract ReplicationPackage createPackageForAdd(ReplicationRequest request)
             throws ReplicationPackageBuildingException;
 
-    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
+    public ReplicationPackage readPackage(InputStream stream,
+                                          boolean install) throws ReplicationPackageReadingException {
         ReplicationPackage replicationPackage = null;
         if (!stream.markSupported()) {
             stream = new BufferedInputStream(stream);
@@ -71,35 +72,28 @@ public abstract class AbstractReplicatio
             int bytesRead = stream.read(buffer, 0, 6);
             stream.reset();
             String s = new String(buffer, "UTF-8");
-            log.info("read {} bytes as {}", bytesRead, s);
-
+            if (log.isInfoEnabled()) {
+                log.info("read {} bytes as {}", bytesRead, s);
+            }
             if (bytesRead > 0 && buffer[0] > 0 && s.startsWith("DEL")) {
-                replicationPackage = VoidReplicationPackage.fromStream(stream);
+                replicationPackage = readPackageForDelete(stream);
             }
         } catch (Exception e) {
             log.warn("cannot parse stream", e);
         }
         stream.mark(-1);
         if (replicationPackage == null) {
-            replicationPackage = readPackageInternal(stream);
+            replicationPackage = readPackageForAdd(stream, install);
         }
         return replicationPackage;
     }
 
-
-    public boolean installPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException {
-        ReplicationActionType actionType = ReplicationActionType.fromName(replicationPackage.getAction());
-        if (ReplicationActionType.DELETE.equals(actionType)) {
-            return installDeletePackage(replicationPackage);
-        }
-
-        return installPackageInternal(replicationPackage);
-
-    }
-
-    private boolean installDeletePackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException {
+    private ReplicationPackage readPackageForDelete(InputStream stream) throws ReplicationPackageReadingException {
+        ReplicationPackage replicationPackage = null;
         Session session = null;
         try {
+            replicationPackage = VoidReplicationPackage.fromStream(stream);
+
             if(replicationPackage != null){
                 session = getSession();
                 for (String path : replicationPackage.getPaths()) {
@@ -108,7 +102,6 @@ public abstract class AbstractReplicatio
                     }
                 }
                 session.save();
-                return true;
             }
         } catch (Exception e) {
             throw new ReplicationPackageReadingException(e);
@@ -118,7 +111,7 @@ public abstract class AbstractReplicatio
             }
         }
 
-        return false;
+        return replicationPackage;
     }
 
     public ReplicationPackage getPackage(String id) {
@@ -140,13 +133,10 @@ public abstract class AbstractReplicatio
 
     protected abstract Session getSession() throws RepositoryException;
 
-    protected abstract ReplicationPackage readPackageInternal(InputStream stream)
+    protected abstract ReplicationPackage readPackageForAdd(InputStream stream, boolean install)
             throws ReplicationPackageReadingException;
 
 
-    protected abstract boolean installPackageInternal(ReplicationPackage replicationPackage)
-            throws ReplicationPackageReadingException;
-
     protected abstract ReplicationPackage getPackageInternal(String id);
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Mon Aug  4 14:36:16 2014
@@ -48,7 +48,7 @@ public class VoidReplicationPackage impl
         this.paths = request.getPaths();
         this.action = request.getAction().toString();
         this.id = request.getAction().toString()
-                + ':' + Arrays.toString(request.getPaths()).replaceAll("\\[", "").replaceAll("\\]", "")
+                + ':' + Arrays.toString(request.getPaths())
                 + ':' + request.getTime()
                 + ':' + type;
     }
@@ -70,7 +70,7 @@ public class VoidReplicationPackage impl
         VoidReplicationPackage replicationPackage = null;
         if(replicationActionType != null){
             pathsString = Text.unescape(pathsString);
-            String[] paths = pathsString.split(", ");
+            String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
 
             ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
                     replicationActionType, paths);

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java Mon Aug  4 14:36:16 2014
@@ -41,7 +41,7 @@ public class VoidReplicationPackageBuild
         return new VoidReplicationPackage(request, "VOID");
     }
 
-    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
+    public ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException {
         try {
             return VoidReplicationPackage.fromStream(stream);
         } catch (Exception e) {
@@ -57,8 +57,4 @@ public class VoidReplicationPackageBuild
             return null;
         }
     }
-
-    public boolean installPackage(ReplicationPackage replicationPackage) {
-        return false;
-    }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java Mon Aug  4 14:36:16 2014
@@ -34,7 +34,10 @@ import org.apache.jackrabbit.vault.fs.ap
 import org.apache.jackrabbit.vault.fs.config.DefaultMetaInf;
 import org.apache.jackrabbit.vault.fs.config.DefaultWorkspaceFilter;
 import org.apache.jackrabbit.vault.fs.io.ImportOptions;
-import org.apache.jackrabbit.vault.packaging.*;
+import org.apache.jackrabbit.vault.packaging.ExportOptions;
+import org.apache.jackrabbit.vault.packaging.JcrPackage;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.jackrabbit.vault.packaging.VaultPackage;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.jcr.api.SlingRepository;
 import org.apache.sling.replication.communication.ReplicationRequest;
@@ -135,7 +138,7 @@ public class FileVaultReplicationPackage
     }
 
     @Override
-    protected ReplicationPackage readPackageInternal(final InputStream stream)
+    protected ReplicationPackage readPackageForAdd(final InputStream stream, boolean install)
             throws ReplicationPackageReadingException {
         if (log.isDebugEnabled()) {
             log.debug("reading a stream");
@@ -147,7 +150,9 @@ public class FileVaultReplicationPackage
             if (session != null) {
                 final JcrPackage jcrPackage = packaging.getPackageManager(session).upload(stream, true,
                         false);
-
+                if (install) {
+                    jcrPackage.install(new ImportOptions());
+                }
                 pkg = new FileVaultReplicationPackage(jcrPackage.getPackage());
             }
         } catch (Exception e) {
@@ -161,7 +166,6 @@ public class FileVaultReplicationPackage
         return pkg;
     }
 
-
     @Override
     protected ReplicationPackage getPackageInternal(String id) {
         ReplicationPackage replicationPackage = null;
@@ -171,10 +175,6 @@ public class FileVaultReplicationPackage
                 VaultPackage pkg = packaging.getPackageManager().open(file);
                 replicationPackage = new FileVaultReplicationPackage(pkg);
             }
-            else {
-                VaultPackage pkg = packaging.getPackageManager(getSession()).open(PackageId.fromString(id)).getPackage();
-                replicationPackage = new FileVaultReplicationPackage(pkg);
-            }
         } catch (Exception e) {
             log.info("could not find a package with id : {}", id);
         }
@@ -189,28 +189,4 @@ public class FileVaultReplicationPackage
     }
 
 
-    @Override
-    public boolean installPackageInternal(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException{
-        log.debug("reading a stream");
-
-        Session session = null;
-        try {
-            session = getSession();
-            if (session != null) {
-                final JcrPackage jcrPackage =  packaging.getPackageManager(getSession())
-                        .open(PackageId.fromString(replicationPackage.getId()));
-
-                jcrPackage.install(new ImportOptions());
-
-            }
-        } catch (Exception e) {
-            log.error("could not read / install the package", e);
-            throw new ReplicationPackageReadingException(e);
-        } finally {
-            if (session != null) {
-                session.logout();
-            }
-        }
-        return false;
-    }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Mon Aug  4 14:36:16 2014
@@ -62,6 +62,18 @@ public class ReplicationAgentServlet ext
     @Override
     protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response)
             throws ServletException, IOException {
+        String action = request.getHeader(ReplicationHeader.ACTION.toString());
+
+        if(ReplicationActionType.POLL.getName().equalsIgnoreCase(action)){
+            doRemove(request, response);
+        }
+        else {
+            doCreate(request, response);
+        }
+    }
+
+    private void doCreate(SlingHttpServletRequest request, SlingHttpServletResponse response)
+            throws IOException {
 
         response.setContentType("application/json");
 
@@ -99,6 +111,56 @@ public class ReplicationAgentServlet ext
         }
     }
 
+    private void doRemove(SlingHttpServletRequest request, SlingHttpServletResponse response) {
+
+        response.setContentType(ContentType.APPLICATION_OCTET_STREAM.toString());
+
+        String queueName = request.getParameter(ReplicationParameter.QUEUE.toString());
+
+        ReplicationAgent agent = request.getResource().adaptTo(ReplicationAgent.class);
+
+        /* directly polling an agent queue is only possible if such an agent doesn't have its own endpoint
+        (that is it just adds items to its queue to be polled remotely)*/
+        if (agent != null) {
+            try {
+                // TODO : consider using queue distribution strategy and validating who's making this request
+                log.info("getting item from queue {}", queueName);
+
+                // get first item
+                ReplicationPackage head = agent.removeHead(queueName);
+
+                if (head != null) {
+                    InputStream inputStream = null;
+                    int bytesCopied = -1;
+                    try {
+                        inputStream = head.createInputStream();
+                        bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
+                    }
+                    finally {
+                        IOUtils.closeQuietly(inputStream);
+                    }
+
+                    setPackageHeaders(response, head);
+
+                    // delete the package permanently
+                    head.delete();
+
+                    log.info("{} bytes written into the response", bytesCopied);
+
+                } else {
+                    log.info("nothing to fetch");
+                }
+            } catch (Exception e) {
+                response.setStatus(503);
+                log.error("error while reverse replicating from agent", e);
+            }
+            // everything ok
+            response.setStatus(200);
+        } else {
+            response.setStatus(404);
+        }
+    }
+
     String[] toStringArray(Enumeration<String> e){
         List<String> l = new ArrayList<String>();
         while (e.hasMoreElements()){
@@ -106,5 +168,15 @@ public class ReplicationAgentServlet ext
         }
 
         return l.toArray(new String[l.size()]);
+
+    }
+
+    void setPackageHeaders(SlingHttpServletResponse response, ReplicationPackage replicationPackage){
+        response.setHeader(ReplicationHeader.TYPE.toString(), replicationPackage.getType());
+        response.setHeader(ReplicationHeader.ACTION.toString(), replicationPackage.getAction());
+        for (String path : replicationPackage.getPaths()){
+            response.setHeader(ReplicationHeader.PATH.toString(), path);
+        }
+
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java Mon Aug  4 14:36:16 2014
@@ -31,7 +31,6 @@ import org.apache.sling.api.SlingHttpSer
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.resources.ReplicationConstants;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,17 +62,23 @@ public class ReplicationPackageImporterS
         response.setCharacterEncoding("utf-8");
 
         InputStream stream = request.getInputStream();
+        String type = request.getHeader(ReplicationHeader.TYPE.toString());
         try {
-            ReplicationPackage replicationPackage = replicationPackageImporter.readPackage(stream);
-            success = replicationPackageImporter.importPackage(replicationPackage);
+           success = replicationPackageImporter.importStream(stream, type);
         } catch (final Exception e) {
             response.setStatus(400);
-            log.error("Error during replication: {}", e.getMessage(), e);
+            if (log.isErrorEnabled()) {
+                log.error("Error during replication: {}", e.getMessage(), e);
+            }
             response.getWriter().print("error: " + e.toString());
         } finally {
             final long end = System.currentTimeMillis();
-            log.info("Processed replication request in {}ms: : {}", new Object[]{end - start, success});
+            if (log.isInfoEnabled()) {
+                log.info("Processed replication request in {}ms: : {}", new Object[]{
+                        end - start, success});
+            }
         }
+
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java Mon Aug  4 14:36:16 2014
@@ -19,6 +19,7 @@
 package org.apache.sling.replication.transport;
 
 import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
 
@@ -32,9 +33,28 @@ public interface TransportHandler {
      * Executes the transport of a given {@link ReplicationPackage} to a specific {@link ReplicationEndpoint} using this
      * transport and the supplied {@link TransportAuthenticationProvider} for authenticating the endpoint
      *
+     * @param  agentName a replication agent name
      * @param replicationPackage  a {@link ReplicationPackage} to transport
      * @throws ReplicationTransportException if any error occurs during the transport
      */
-    void transport(ReplicationPackage replicationPackage) throws ReplicationTransportException;
+    void transport(String agentName, ReplicationPackage replicationPackage) throws ReplicationTransportException;
+
+
+    /**
+     * Enables response processing for this <code>TransportHandler</code> for a certain <code>ReplicationAgent</code>
+     *
+     * @param agentName the name of the <code>ReplicationAgent</code>
+     * @param responseProcessor a <code>ReplicationQueueProcessor</code> that is called by the <code>TransportHandler</code>
+     *                          whenever a response is received
+     */
+    void enableProcessing(String agentName, ReplicationQueueProcessor responseProcessor);
+
+    /**
+     *
+     * Disables response processing for this <code>TransportHandler</code>
+     *
+     * @param agentName the name of the <code>ReplicationAgent</code>
+     */
+    void disableProcessing(String agentName);
 
 }
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java Mon Aug  4 14:36:16 2014
@@ -34,14 +34,18 @@ public abstract class AbstractTransportH
     private int lastSuccessfulEnpointId = 0;
 
 
+    private final Map<String, ReplicationQueueProcessor> responseProcessorMap = new ConcurrentHashMap<String, ReplicationQueueProcessor>();
+
     public AbstractTransportHandler(ReplicationEndpoint[] endpoints, TransportEndpointStrategyType endpointStrategyType) {
         this.endpoints = endpoints;
         this.endpointStrategyType = endpointStrategyType;
     }
 
-    public void transport(ReplicationPackage replicationPackage)
+    public void transport(String agentName, ReplicationPackage replicationPackage)
             throws ReplicationTransportException {
 
+        ReplicationQueueProcessor responseProcessor = responseProcessorMap.get(agentName);
+
         ReplicationTransportException lastException = null;
         int offset = 0;
         if (endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful)) {
@@ -53,7 +57,7 @@ public abstract class AbstractTransportH
 
             ReplicationEndpoint replicationEndpoint = endpoints[currentId];
             try {
-                deliverPackage(replicationPackage, replicationEndpoint);
+                deliverPackage(replicationPackage, replicationEndpoint, responseProcessor);
                 lastSuccessfulEnpointId = currentId;
                 if (endpointStrategyType.equals(TransportEndpointStrategyType.FirstSuccessful) ||
                         endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful))
@@ -68,22 +72,32 @@ public abstract class AbstractTransportH
     }
 
 
+    public void enableProcessing(String agentName, ReplicationQueueProcessor responseProcessor){
+        responseProcessorMap.put(agentName, responseProcessor);
+    }
+
+    public void disableProcessing(String agentName){
+        responseProcessorMap.remove(agentName);
+    }
+
 
     private void deliverPackage(ReplicationPackage replicationPackage,
-                                ReplicationEndpoint replicationEndpoint)
+                                ReplicationEndpoint replicationEndpoint,
+                                ReplicationQueueProcessor responseProcessor)
             throws ReplicationTransportException {
         if (!validateEndpoint(replicationEndpoint))
             throw new ReplicationTransportException("invalid endpoint " + replicationEndpoint.getUri());
 
         try {
-            deliverPackageToEndpoint(replicationPackage, replicationEndpoint);
+            deliverPackageToEndpoint(replicationPackage, replicationEndpoint, responseProcessor);
         } catch (Exception e) {
             throw new ReplicationTransportException(e);
         }
     }
 
     protected abstract void deliverPackageToEndpoint(ReplicationPackage replicationPackage,
-                                                     ReplicationEndpoint replicationEndpoint)
+                                                     ReplicationEndpoint replicationEndpoint,
+                                                     ReplicationQueueProcessor responseProcessor)
             throws Exception;
 
     protected abstract boolean validateEndpoint(ReplicationEndpoint endpoint);

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java Mon Aug  4 14:36:16 2014
@@ -80,7 +80,7 @@ public class HttpTransportHandler extend
 
     @Override
     public void deliverPackageToEndpoint(ReplicationPackage replicationPackage,
-                                         ReplicationEndpoint replicationEndpoint) throws Exception {
+                                         ReplicationEndpoint replicationEndpoint, ReplicationQueueProcessor responseProcessor) throws Exception {
         log.info("delivering package {} to {} using auth {}",
                 new Object[]{replicationPackage.getId(),
                         replicationEndpoint.getUri(), transportAuthenticationProvider});

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java Mon Aug  4 14:36:16 2014
@@ -64,7 +64,7 @@ public class RepositoryTransportHandler 
     }
 
     @Override
-    public void deliverPackageToEndpoint(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint)
+    public void deliverPackageToEndpoint(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint, ReplicationQueueProcessor responseProcessor)
             throws Exception {
 
         Session session = null;

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java Mon Aug  4 14:36:16 2014
@@ -25,8 +25,6 @@ import org.apache.sling.replication.agen
 import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
-import org.apache.sling.replication.serialization.ReplicationPackageExporter;
-import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
@@ -43,17 +41,10 @@ public class ReplicationAgentServiceFact
     public void testActivationWithAllServicesAndPropertiesBound() throws Exception {
         ReplicationAgentServiceFactory serviceFactory = new ReplicationAgentServiceFactory();
 
-        Field packageImporterField = serviceFactory.getClass().getDeclaredField("packageImporter");
-        packageImporterField.setAccessible(true);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        packageImporterField.set(serviceFactory, packageImporter);
-
-
-        Field packageExporterField = serviceFactory.getClass().getDeclaredField("packageExporter");
-        packageExporterField.setAccessible(true);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);
-        packageExporterField.set(serviceFactory, packageExporter);
-
+        Field packageBuilderField = serviceFactory.getClass().getDeclaredField("packageBuilder");
+        packageBuilderField.setAccessible(true);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        packageBuilderField.set(serviceFactory, packageBuilder);
 
         Field distributionField = serviceFactory.getClass().getDeclaredField("queueDistributionStrategy");
         distributionField.setAccessible(true);
@@ -65,6 +56,11 @@ public class ReplicationAgentServiceFact
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         queueField.set(serviceFactory, queueProvider);
 
+        Field transportField = serviceFactory.getClass().getDeclaredField("transportHandler");
+        transportField.setAccessible(true);
+        TransportHandler transportHandler = mock(TransportHandler.class);
+        transportField.set(serviceFactory, transportHandler);
+
 
         Map<String, Object> dictionary = new HashMap<String, Object>();
         dictionary.put("endpoint", "http://somewhere.com");

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Mon Aug  4 14:36:16 2014
@@ -30,8 +30,6 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.impl.simple.SimpleReplicationQueue;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
-import org.apache.sling.replication.serialization.ReplicationPackageExporter;
-import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.junit.Test;
 
@@ -52,20 +50,17 @@ public class SimpleReplicationAgentTest 
     public void testSyncReplicationWithFailingDistributionStrategy() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);
-
-
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, replicationEventFactory, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, replicationEventFactory, null);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                 ReplicationActionType.ADD, "/");
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
-        when(packageExporter.exportPackage(request)).thenReturn(replicationPackage);
+        when(packageBuilder.createPackage(request)).thenReturn(replicationPackage);
         when(queueProvider.getDefaultQueue(agent.getName())).thenReturn(
                 new SimpleReplicationQueue(agent.getName(), "name"));
         ReplicationResponse response = agent.execute(request);
@@ -77,12 +72,12 @@ public class SimpleReplicationAgentTest 
     public void testSyncReplicationWithWorkingDistributionStrategy() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, replicationEventFactory, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, replicationEventFactory, null);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                 ReplicationActionType.ADD, "/");
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
@@ -90,7 +85,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueItemState state = new ReplicationQueueItemState();
         state.setItemState(ReplicationQueueItemState.ItemState.SUCCEEDED);
         when(distributionHandler.add(any(String.class), any(ReplicationQueueItem.class), eq(queueProvider))).thenReturn(state);
-        when(packageExporter.exportPackage(any(ReplicationRequest.class))).thenReturn(replicationPackage);
+        when(packageBuilder.createPackage(any(ReplicationRequest.class))).thenReturn(replicationPackage);
         when(queueProvider.getDefaultQueue(agent.getName())).thenReturn(
                 new SimpleReplicationQueue(agent.getName(), "name"));
         ReplicationResponse response = agent.execute(request);
@@ -102,17 +97,17 @@ public class SimpleReplicationAgentTest 
     public void testAsyncReplication() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, replicationEventFactory, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, replicationEventFactory, null);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                 ReplicationActionType.ADD, "/");
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
-        when(packageExporter.exportPackage(request)).thenReturn(replicationPackage);
+        when(packageBuilder.createPackage(request)).thenReturn(replicationPackage);
         when(queueProvider.getDefaultQueue(agent.getName())).thenReturn(
                 new SimpleReplicationQueue(agent.getName(), "name"));
         agent.send(request);
@@ -122,12 +117,11 @@ public class SimpleReplicationAgentTest 
     public void testGetDefaultQueue() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, null, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, null, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getDefaultQueue(agent.getName())).thenReturn(queue);
         assertNotNull(agent.getQueue(null));
@@ -137,12 +131,11 @@ public class SimpleReplicationAgentTest 
     public void testGetExistingNamedQueue() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, null, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, null, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getQueue(agent.getName(), "priority")).thenReturn(queue);
         assertNotNull(agent.getQueue("priority"));
@@ -152,12 +145,11 @@ public class SimpleReplicationAgentTest 
     public void testGetNonExistingNamedQueue() throws Exception {
         String name = "sample-agent";
         TransportHandler transportHandler = mock(TransportHandler.class);
-        ReplicationPackageImporter packageImporter = mock(ReplicationPackageImporter.class);
-        ReplicationPackageExporter packageExporter = mock(ReplicationPackageExporter.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
         SimpleReplicationAgent agent = new SimpleReplicationAgent(name, new String[0], true,
-                false, packageImporter, packageExporter, queueProvider, distributionHandler, null, null);
+                transportHandler, packageBuilder, queueProvider, distributionHandler, null, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getQueue(agent.getName(), "priority")).thenReturn(queue);
         assertNull(agent.getQueue("weird"));

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java Mon Aug  4 14:36:16 2014
@@ -37,7 +37,7 @@ public class VoidReplicationPackageTest 
     public void testCreatedAndReadPackagesEquality() throws Exception {
         ReplicationRequest request = new ReplicationRequest(123l, ReplicationActionType.DELETE, "/abc");
         VoidReplicationPackage createdPackage = new VoidReplicationPackage(request, "VOID");
-        VoidReplicationPackage readPackage = VoidReplicationPackage.fromStream(new ByteArrayInputStream("DELETE:/abc:123:VOID".getBytes()));
+        VoidReplicationPackage readPackage = VoidReplicationPackage.fromStream(new ByteArrayInputStream("DELETE:[/abc]:123:VOID".getBytes()));
         assertEquals(createdPackage.getId(), readPackage.getId());
         assertEquals(createdPackage.getAction(), readPackage.getAction());
         assertEquals(createdPackage.getType(), readPackage.getType());

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java Mon Aug  4 14:36:16 2014
@@ -62,7 +62,7 @@ public class HttpTransportHandlerTest {
         when(response.returnContent()).thenReturn(content);
         when(executor.execute(any(Request.class))).thenReturn(response);
         when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor);
-        httpTransportHandler.transport(replicationPackage);
+        httpTransportHandler.transport("agentName", replicationPackage);
     }
 
     @Test
@@ -87,7 +87,7 @@ public class HttpTransportHandlerTest {
         when(executor.execute(any(Request.class))).thenReturn(response);
         when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor);
 
-        httpTransportHandler.transport(replicationPackage);
+        httpTransportHandler.transport("agentName", replicationPackage);
 
         verify(executor, times(1)).execute(any(Request.class));
     }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java Mon Aug  4 14:36:16 2014
@@ -48,7 +48,7 @@ public class RepositoryTransportHandlerT
                 transportAuthenticationProvider,
                 new ReplicationEndpoint[] {  new ReplicationEndpoint("repo://var/outbox/replication/rev1") });
         try {
-            handler.transport(null);
+            handler.transport("agentName", null);
             fail("cannot deliver without a proper session");
         } catch (ReplicationTransportException re) {
             // failure expected
@@ -84,6 +84,6 @@ public class RepositoryTransportHandlerT
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getId()).thenReturn("some-id");
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps", "/libs"});
-        handler.transport(replicationPackage);
+        handler.transport("agentName", replicationPackage);
     }
 }

Modified: sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java (original)
+++ sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java Mon Aug  4 14:36:16 2014
@@ -37,8 +37,8 @@ public class ForwardReplicationTest exte
 
     @Test
     public void testDeleteContent() throws Exception {
-        String nodePath = createRandomNode(publishClient, "/content");
-
+        String nodePath = createRandomNode(authorClient, "/content");
+        replicate(author, "publish", ReplicationActionType.ADD, nodePath);
         assertExists(publishClient, nodePath);
 
         replicate(author, "publish", ReplicationActionType.DELETE, nodePath);

Modified: sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java (original)
+++ sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java Mon Aug  4 14:36:16 2014
@@ -24,6 +24,8 @@ import org.apache.sling.testing.tools.sl
 import org.apache.sling.testing.tools.sling.SlingInstanceManager;
 
 import static org.apache.sling.replication.it.ReplicationUtils.assertExists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration test base class for replication
@@ -47,13 +49,15 @@ public abstract class ReplicationIntegra
 
         try {
             // change the url for publish agent and wait for it to start
-            String receiverUrl = "http://localhost:4503/libs/sling/replication/importers/default"
+
+            String receiverUrl = "http://localhost:4503/libs/sling/replication/importer/default"
                     .replace("http://localhost:4503", publish.getServerBaseUrl());
-            authorClient.setProperties("/libs/sling/replication/config/importers/remote/publish",
+            authorClient.setProperties("/libs/sling/replication/config/transport/http/http-publish-receive",
                     "endpoints", receiverUrl);
-            assertExists(authorClient, "/libs/sling/replication/agents/publish");
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
+            assertExists(authorClient, "/libs/sling/replication/agent/publish");
+        }
+        catch (Exception ex) {
+
         }
 
     }

Modified: sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java?rev=1615577&r1=1615576&r2=1615577&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java Mon Aug  4 14:36:16 2014
@@ -88,15 +88,17 @@ public class ReplicationUtils {
         ).getContent().replaceAll("\n", "").trim();
 
 
-        for (String parameter : parameters) {
-            assertTrue(parameter + " is not contained in " + content,
-                    content.contains(parameter));
+        for (int i = 0; i < parameters.length; i++) {
+            assertTrue(parameters[i] + " is not contained in " + content,
+                    content.contains(parameters[i])
+            );
         }
     }
 
 
-    public static void replicate(SlingInstance slingInstance, String agentName, ReplicationActionType action, String... paths) throws IOException {
-        String agentResource = agentUrl(agentName);
+    public static void replicate(SlingInstance slingInstance, String agent, ReplicationActionType action, String... paths) throws IOException {
+        String agentResource = agentUrl("publish");
+
 
         List<String> args = new ArrayList<String>();
         args.add(ReplicationHeader.ACTION.toString());
@@ -114,6 +116,7 @@ public class ReplicationUtils {
 
     public static void deleteNode(SlingInstance slingInstance, String path) throws IOException {
         assertPostResourceWithParameters(slingInstance, 200, path, ":operation", "delete");
+
     }
 
     public static void assertExists(SlingClient slingClient, String path) throws Exception {
@@ -130,6 +133,7 @@ public class ReplicationUtils {
         while(slingClient.exists(path) && retries-- > 0) {
             Thread.sleep(1000);
         }
+
         assertTrue(retries > 0);
     }
 
@@ -145,23 +149,23 @@ public class ReplicationUtils {
     }
 
     public static String agentUrl(String agentName) {
-        return REPLICATION_ROOT_PATH + "/agents/" + agentName;
+        return REPLICATION_ROOT_PATH + "/agent/" + agentName;
     }
 
     public static String queueUrl(String agentName) {
-        return REPLICATION_ROOT_PATH + "/agents/" + agentName +"/queue";
+        return REPLICATION_ROOT_PATH + "/agent/" + agentName +"/queue";
     }
 
     public static String agentConfigUrl(String agentName) {
-        return REPLICATION_ROOT_PATH + "/config/agents/" + agentName;
+        return REPLICATION_ROOT_PATH + "/config/agent/" + agentName;
     }
 
 
     public static String importerRootUrl() {
-        return REPLICATION_ROOT_PATH + "/importers";
+        return REPLICATION_ROOT_PATH + "/importer";
     }
 
     public static String importerUrl(String importerName) {
-        return REPLICATION_ROOT_PATH + "/importers/" + importerName;
+        return REPLICATION_ROOT_PATH + "/importer/" + importerName;
     }
 }



Mime
View raw message