sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1615605 [1/2] - in /sling/trunk/contrib/extensions/replication: core/src/main/java/org/apache/sling/replication/agent/impl/ core/src/main/java/org/apache/sling/replication/queue/ core/src/main/java/org/apache/sling/replication/queue/impl/j...
Date Mon, 04 Aug 2014 15:03:08 GMT
Author: tommaso
Date: Mon Aug  4 15:03:06 2014
New Revision: 1615605

URL: http://svn.apache.org/r1615605
Log:
SLING-3816 - applied Marius Petria's patch for same flow in forward/reverse replication

Added:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/RemoteReplicationPackageImporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/org.apache.sling.replication.transport.impl.exporter.RemoteReplicationPackageExporter-publish.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/author/org.apache.sling.replication.transport.impl.importer.RemoteReplicationPackageImporter-publish.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverse.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.serialization.impl.exporter.AgentReplicationPackageExporter.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-agentsConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-httpTransportConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-remoteExportersConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-remoteImportersConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/services/
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/services/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-agents.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/services/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-exporters.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/services/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-importers.json
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReverseReplicationTest.java   (with props)
Removed:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ReplicationPackageAdapterFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/NopTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-author.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-author-receive.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-publish-receive.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.PollingTransportHandlerFactory-http-publish-poll.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-replicationAgents.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-replicationAgentsConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-replicationHttpTransportConfig.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-replicationImporters.json
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerTest.java
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ForwardReplicationTest.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationUtils.java

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Mon Aug  4 15:03:06 2014
@@ -41,10 +41,8 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
 import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
 import org.apache.sling.replication.rule.ReplicationRuleEngine;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
-import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
-import org.apache.sling.replication.transport.TransportHandler;
-import org.apache.sling.replication.transport.impl.NopTransportHandler;
+import org.apache.sling.replication.serialization.ReplicationPackageExporter;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
@@ -69,16 +67,10 @@ public class ReplicationAgentServiceFact
 
     static final String SERVICE_PID = "org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory";
 
-    private static final String TRANSPORT = ReplicationAgentConfiguration.TRANSPORT;
-
     private static final String QUEUEPROVIDER = ReplicationAgentConfiguration.QUEUEPROVIDER;
 
-    private static final String PACKAGING = ReplicationAgentConfiguration.PACKAGING;
-
     private static final String QUEUE_DISTRIBUTION = ReplicationAgentConfiguration.QUEUE_DISTRIBUTION;
 
-    private static final String DEFAULT_PACKAGING = "(name="
-            + FileVaultReplicationPackageBuilder.NAME + ")";
 
     private static final String DEFAULT_QUEUEPROVIDER = "(name="
             + JobHandlingReplicationQueueProvider.NAME + ")";
@@ -86,9 +78,6 @@ public class ReplicationAgentServiceFact
     private static final String DEFAULT_DISTRIBUTION = "(name="
             + SingleQueueDistributionStrategy.NAME + ")";
 
-    private static final String DEFAULT_TRANSPORT = "(name="
-            + NopTransportHandler.NAME + ")";
-
     @Property(boolValue = true, label = "Enabled")
     private static final String ENABLED = ReplicationAgentConfiguration.ENABLED;
 
@@ -101,13 +90,13 @@ public class ReplicationAgentServiceFact
     @Property(boolValue = true, label = "Replicate using aggregated paths")
     private static final String USE_AGGREGATE_PATHS = ReplicationAgentConfiguration.USE_AGGREGATE_PATHS;
 
-    @Property(label = "Target TransportHandler", name = TRANSPORT, value = "(name=" + NopTransportHandler.NAME + ")")
-    @Reference(name = "TransportHandler", target = "(name=" + NopTransportHandler.NAME + ")", policy = ReferencePolicy.DYNAMIC)
-    private volatile TransportHandler transportHandler;
-
-    @Property(label = "Target ReplicationPackageBuilder", name = PACKAGING, value = DEFAULT_PACKAGING)
-    @Reference(name = "ReplicationPackageBuilder", target = DEFAULT_PACKAGING, policy = ReferencePolicy.DYNAMIC)
-    private volatile ReplicationPackageBuilder packageBuilder;
+    @Property(label = "Target ReplicationPackageExporter", name = "ReplicationPackageExporter.target", value = "(name=vlt)")
+    @Reference(name = "ReplicationPackageExporter", target = "(name=vlt)", policy = ReferencePolicy.DYNAMIC)
+    private ReplicationPackageExporter packageExporter;
+
+    @Property(label = "Target ReplicationPackageImporter", name = "ReplicationPackageImporter.target", value = "(name=default)")
+    @Reference(name = "ReplicationPackageImporter", target = "(name=default)", policy = ReferencePolicy.DYNAMIC)
+    private ReplicationPackageImporter packageImporter;
 
     @Property(label = "Target ReplicationQueueProvider", name = QUEUEPROVIDER, value = DEFAULT_QUEUEPROVIDER)
     @Reference(name = "ReplicationQueueProvider", target = DEFAULT_QUEUEPROVIDER, policy = ReferencePolicy.DYNAMIC)
@@ -149,11 +138,6 @@ public class ReplicationAgentServiceFact
                     .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
             props.put(NAME, name);
 
-            String transport = PropertiesUtil.toString(config.get(TRANSPORT), "");
-            props.put(TRANSPORT, transport);
-
-            String packaging = PropertiesUtil.toString(config.get(PACKAGING), "");
-            props.put(PACKAGING, packaging);
 
             String queue = PropertiesUtil.toString(config.get(QUEUEPROVIDER), "");
             props.put(QUEUEPROVIDER, queue);
@@ -168,19 +152,22 @@ public class ReplicationAgentServiceFact
             boolean useAggregatePaths = PropertiesUtil.toBoolean(config.get(USE_AGGREGATE_PATHS), true);
             props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
 
+            boolean isPassive = PropertiesUtil.toBoolean(config.get("isPassive"), false);
+            props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
+
             // check configuration is valid
-            if (name == null || packageBuilder == null || queueProvider == null || queueDistributionStrategy == null) {
+            if (name == null || packageExporter == null || packageImporter == null || queueProvider == null || queueDistributionStrategy == null) {
                 throw new AgentConfigurationException("configuration for this agent is not valid");
             }
 
 
             if (log.isInfoEnabled()) {
                 log.info("bound services for {} :  {} - {} - {} - {} - {} - {}", new Object[]{name,
-                        transportHandler, packageBuilder, queueProvider, queueDistributionStrategy});
+                        packageImporter, packageExporter, queueProvider, queueDistributionStrategy});
             }
 
-            ReplicationAgent agent = new SimpleReplicationAgent(name, rules, useAggregatePaths,
-                    transportHandler, packageBuilder, queueProvider, queueDistributionStrategy, replicationEventFactory, replicationRuleEngine);
+            ReplicationAgent agent = new SimpleReplicationAgent(name, rules, useAggregatePaths, isPassive,
+                    packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, replicationRuleEngine);
 
 
             // only enable if instance runmodes match configured ones

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Mon Aug  4 15:03:06 2014
@@ -18,8 +18,6 @@
  */
 package org.apache.sling.replication.agent.impl;
 
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.List;
@@ -39,12 +37,10 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.rule.ReplicationRuleEngine;
 import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageExporter;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
-import org.apache.sling.replication.transport.ReplicationTransportException;
-import org.apache.sling.replication.transport.TransportHandler;
-import org.apache.sling.replication.transport.impl.NopTransportHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,15 +49,13 @@ import org.slf4j.LoggerFactory;
  */
 public class SimpleReplicationAgent implements ReplicationAgent {
 
-    private final static String RESPONSE_QUEUE = "response";
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final ReplicationPackageBuilder packageBuilder;
-
     private final ReplicationQueueProvider queueProvider;
 
-    private final TransportHandler transportHandler;
+    private final boolean passive;
+    private final ReplicationPackageImporter replicationPackageImporter;
+    private final ReplicationPackageExporter replicationPackageExporter;
 
     private final ReplicationQueueDistributionStrategy queueDistributionStrategy;
 
@@ -77,15 +71,17 @@ public class SimpleReplicationAgent impl
 
     public SimpleReplicationAgent(String name, String[] rules,
                                   boolean useAggregatePaths,
-                                  TransportHandler transportHandler,
-                                  ReplicationPackageBuilder packageBuilder,
+                                  boolean passive,
+                                  ReplicationPackageImporter replicationPackageImporter,
+                                  ReplicationPackageExporter replicationPackageExporter,
                                   ReplicationQueueProvider queueProvider,
                                   ReplicationQueueDistributionStrategy queueDistributionHandler,
                                   ReplicationEventFactory replicationEventFactory, ReplicationRuleEngine ruleEngine) {
         this.name = name;
         this.rules = rules;
-        this.transportHandler = transportHandler;
-        this.packageBuilder = packageBuilder;
+        this.passive = passive;
+        this.replicationPackageImporter = replicationPackageImporter;
+        this.replicationPackageExporter = replicationPackageExporter;
         this.queueProvider = queueProvider;
         this.queueDistributionStrategy = queueDistributionHandler;
         this.useAggregatePaths = useAggregatePaths;
@@ -110,7 +106,7 @@ public class SimpleReplicationAgent impl
     }
 
     public boolean isPassive() {
-        return transportHandler == null || transportHandler instanceof NopTransportHandler; // TODO : improve this
+        return passive;
     }
 
 
@@ -118,7 +114,7 @@ public class SimpleReplicationAgent impl
         // create package from request
         ReplicationPackage replicationPackage;
         try {
-            replicationPackage = packageBuilder.createPackage(replicationRequest);
+            replicationPackage = replicationPackageExporter.exportPackage(replicationRequest);
         } catch (ReplicationPackageBuildingException e) {
             throw new AgentReplicationException(e);
         }
@@ -216,7 +212,7 @@ public class SimpleReplicationAgent impl
             ReplicationQueueItem info = queue.getHead();
             if (info != null) {
                 queue.removeHead();
-                replicationPackage = packageBuilder.getPackage(info.getId());
+                replicationPackage = replicationPackageExporter.exportPackageById(info.getId());
             }
             return replicationPackage;
         } else {
@@ -248,7 +244,6 @@ public class SimpleReplicationAgent impl
 
         if (!isPassive()) {
             queueProvider.enableQueueProcessing(getName(), new PackageQueueProcessor());
-            transportHandler.enableProcessing(getName(), new ResponseProcessor());
         }
     }
 
@@ -260,15 +255,14 @@ public class SimpleReplicationAgent impl
 
         if (!isPassive()) {
             queueProvider.disableQueueProcessing(getName());
-            transportHandler.disableProcessing(getName());
         }
     }
 
-    private boolean processResponseQueue(ReplicationQueueItem queueItem) {
-        InputStream stream = new ByteArrayInputStream(queueItem.getBytes());
-        log.debug("reading package from stream {}", stream);
+    private boolean processTransportQueue(ReplicationQueueItem queueItem) {
+        log.debug("reading package from id {}", queueItem.getId());
         try {
-            ReplicationPackage replicationPackage = packageBuilder.readPackage(stream, true);
+            ReplicationPackage replicationPackage = replicationPackageExporter.exportPackageById(queueItem.getId());
+            replicationPackageImporter.importPackage(replicationPackage);
             replicationPackage.delete();
             return true;
         } catch (ReplicationPackageReadingException e) {
@@ -276,46 +270,11 @@ public class SimpleReplicationAgent impl
         }
     }
 
-    private boolean processTransportQueue(ReplicationQueueItem queueItem) {
-        try {
-            ReplicationPackage replicationPackage = packageBuilder.getPackage(queueItem.getId());
-            if (replicationPackage == null) {
-                return false;
-            }
-            if (transportHandler != null) {
-                transportHandler.transport(getName(), replicationPackage);
-                replicationPackage.delete();
-                return true;
-            } else {
-                log.info("agent {} processing skipped", name);
-                return false;
-            }
-        } catch (ReplicationTransportException e) {
-            log.error("transport error", e);
-            return false;
-        }
-    }
-
     class PackageQueueProcessor implements ReplicationQueueProcessor {
         public boolean process(String queueName, ReplicationQueueItem packageInfo) {
             log.info("running package queue processor");
-            if (RESPONSE_QUEUE.equalsIgnoreCase(queueName)) {
-                return processResponseQueue(packageInfo);
-            } else {
-                return processTransportQueue(packageInfo);
-            }
-        }
-    }
 
-    class ResponseProcessor implements ReplicationQueueProcessor {
-        public boolean process(String queueName, ReplicationQueueItem queueItem) {
-            log.info("running response processor");
-            try {
-                return getQueue(RESPONSE_QUEUE).add(queueItem);
-            } catch (Exception e) {
-                return false;
-            }
+            return processTransportQueue(packageInfo);
         }
     }
-
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItem.java Mon Aug  4 15:03:06 2014
@@ -31,22 +31,12 @@ public class ReplicationQueueItem {
 
     private final String type;
 
-    private final byte[] bytes;
 
-    private ReplicationQueueItem(String id, String[] paths, String action, String type, byte[] bytes) {
+    public ReplicationQueueItem(String id, String[] paths, String action, String type) {
         this.id = id;
         this.paths = paths;
         this.action = action;
         this.type = type;
-        this.bytes = bytes;
-    }
-
-    public ReplicationQueueItem(String id, String[] paths, String action, String type) {
-        this(id, paths, action, type, null);
-    }
-
-    public ReplicationQueueItem(String[] paths, String action, String type, byte[] bytes) {
-        this(null, paths, action, type, bytes);
     }
 
     public String getId() {
@@ -64,12 +54,4 @@ public class ReplicationQueueItem {
     public String getType() {
         return type;
     }
-
-    public byte[] getBytes() {
-        return bytes;
-    }
-
-    public boolean isTransient(){
-        return id == null;
-    }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Mon Aug  4 15:03:06 2014
@@ -38,37 +38,22 @@ public class JobHandlingUtils {
 
     protected static final String ACTION = "replication.package.action";
 
-    protected static final String BYTES = "replication.package.bytes";
-
-
     public static ReplicationQueueItem getPackage(final Job job) {
-        String id = (String) job.getProperty(ID);
-        if (id != null) {
-            return new ReplicationQueueItem((String) job.getProperty(ID),
-                    (String[]) job.getProperty(PATHS),
-                    String.valueOf(job.getProperty(ACTION)),
-                    String.valueOf(job.getProperty(TYPE)));
-        } else {
-            return new ReplicationQueueItem((String[]) job.getProperty(PATHS),
-                    String.valueOf(job.getProperty(ACTION)),
-                    String.valueOf(job.getProperty(TYPE)),
-                    unBox((Byte[]) job.getProperty(BYTES)));
-        }
+        return new ReplicationQueueItem((String) job.getProperty(ID),
+                (String[]) job.getProperty(PATHS),
+                String.valueOf(job.getProperty(ACTION)),
+                String.valueOf(job.getProperty(TYPE)));
     }
 
     public static Map<String, Object> createFullPropertiesFromPackage(
             ReplicationQueueItem replicationPackage) {
         Map<String, Object> properties = new HashMap<String, Object>();
 
-        if (replicationPackage.getId() != null)
-            properties.put(ID, replicationPackage.getId());
+        properties.put(ID, replicationPackage.getId());
         properties.put(PATHS, replicationPackage.getPaths());
         properties.put(ACTION, replicationPackage.getAction());
         properties.put(TYPE, replicationPackage.getType());
 
-        if (replicationPackage.getBytes() != null)
-            properties.put(BYTES, box(replicationPackage.getBytes()));
-
         return properties;
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/ReplicationConstants.java Mon Aug  4 15:03:06 2014
@@ -20,13 +20,17 @@
 package org.apache.sling.replication.resources;
 
 public class ReplicationConstants {
+    public static final String SUFFIX_RESOURCE_LIST = "/list";
+    public static final String SUFFIX_AGENT_QUEUE = "/queue";
+    public static final String SUFFIX_AGENT_QUEUE_EVENT = "/queue/event";
+
     public static final String  AGENT_RESOURCE_TYPE = "replication/agent";
-    public static final String  AGENT_QUEUE_RESOURCE_TYPE = "replication/agent/queue";
-    public static final String  AGENT_QUEUE_EVENT_RESOURCE_TYPE = "replication/agent/queue";
-    public static final String  AGENT_ROOT_RESOURCE_TYPE = "replication/agents";
-    public static final String  IMPORTER_ROOT_RESOURCE_TYPE = "replication/importers";
+    public static final String  AGENT_QUEUE_RESOURCE_TYPE = AGENT_RESOURCE_TYPE + SUFFIX_AGENT_QUEUE;
+    public static final String  AGENT_QUEUE_EVENT_RESOURCE_TYPE = AGENT_QUEUE_RESOURCE_TYPE + SUFFIX_AGENT_QUEUE_EVENT;
+    public static final String  AGENT_ROOT_RESOURCE_TYPE = AGENT_RESOURCE_TYPE + SUFFIX_RESOURCE_LIST;
+
     public static final String  IMPORTER_RESOURCE_TYPE = "replication/importer";
+    public static final String  EXPORTER_RESOURCE_TYPE = "replication/exporter";
+
 
-    public static final String SUFFIX_AGENT_QUEUE = "/queue";
-    public static final String SUFFIX_AGENT_QUEUE_EVENT = "/queue/event";
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/resources/impl/common/AbstractReadableResourceProvider.java Mon Aug  4 15:03:06 2014
@@ -36,7 +36,6 @@ public abstract class AbstractReadableRe
     protected static final String ADAPTABLE_PROPERTY_NAME = "adaptable";
 
     private static final String MAIN_RESOURCE_PREFIX = ".";
-    private static final String ROOT_RESOURCE_PREFIX = "..";
 
     private final String resourceRoot;
     private final Map<String, Map<String, String>> additionalResourcePropertiesMap = new HashMap<String, Map<String, String>>();
@@ -46,12 +45,11 @@ public abstract class AbstractReadableRe
 
         this.resourceRoot = resourceRoot;
 
-        additionalResourcePropertiesMap.put(ROOT_RESOURCE_PREFIX, new HashMap<String, String>());
         additionalResourcePropertiesMap.put(MAIN_RESOURCE_PREFIX, new HashMap<String, String>());
         for (Map.Entry<String, String> entry : additionalResourceProperties.entrySet()) {
             String resourceName = MAIN_RESOURCE_PREFIX;
             String propertyName = entry.getKey();
-            int idx =propertyName.indexOf("/");
+            int idx = propertyName.indexOf("/");
             if (idx >=0) {
                 resourceName = propertyName.substring(0, idx);
                 propertyName = propertyName.substring(idx+1);
@@ -93,8 +91,10 @@ public abstract class AbstractReadableRe
             if (properties != null) {
                 Object adaptable = properties.remove(ADAPTABLE_PROPERTY_NAME);
 
-                Map<String, String > additionalProperties = additionalResourcePropertiesMap.get(ROOT_RESOURCE_PREFIX);
-                properties.putAll(additionalProperties);
+                Map<String, String> additionalProperties = additionalResourcePropertiesMap.get(MAIN_RESOURCE_PREFIX);
+                if (!properties.containsKey("sling:resourceType") && additionalProperties.containsKey("sling:resourceType")) {
+                    properties.put("sling:resourceType", additionalProperties.get("sling:resourceType") +"/list");
+                }
 
                 resource = new SimpleReadableResource(resourceResolver, pathInfo.getResourcePath(), properties, adaptable);
             }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java Mon Aug  4 15:03:06 2014
@@ -40,13 +40,11 @@ public interface ReplicationPackageBuild
      * reads a stream and tries to convert it to a {@link ReplicationPackage} this provider can read and install
      *
      * @param stream  the {@link InputStream} of the package to read
-     * @param install if <code>true</code> then if the package can be read from the stream then it will try also
-     *                to install it into the repository
      * @return a {@link ReplicationPackage} if it can read it from the stream
      * @throws ReplicationPackageReadingException
      *          when the stream cannot be read as a {@link ReplicationPackage}
      */
-    ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException;
+    ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException;
 
     /**
      * get an already created (and saved into the repository) {@link ReplicationPackage} by its id
@@ -56,4 +54,12 @@ public interface ReplicationPackageBuild
      */
     ReplicationPackage getPackage(String id);
 
+    /**
+     * Installs the given replicationPackage into the repository
+     * @param replicationPackage
+     * @return
+     * @throws ReplicationPackageReadingException
+     */
+    boolean installPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException;
+
 }

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java?rev=1615605&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java Mon Aug  4 15:03:06 2014
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization;
+
+
+import org.apache.sling.replication.communication.ReplicationRequest;
+
+/**
+ * A {@link org.apache.sling.replication.serialization.ReplicationPackage) exporter
+ */
+public interface ReplicationPackageExporter {
+    /**
+     * Exports a replication package.
+     * @return the first available package in the exporter.
+     */
+    ReplicationPackage exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException;
+
+    /**
+     * Exports a replication package.
+     * @return the first available package in the exporter.
+     */
+    ReplicationPackage exportPackageById(String replicationPackageId);
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java Mon Aug  4 15:03:06 2014
@@ -24,14 +24,21 @@ import java.io.InputStream;
  * A {@link org.apache.sling.replication.serialization.ReplicationPackage} importer
  */
 public interface ReplicationPackageImporter {
+    /**
+     * Imports the given replication package
+     * @param replicationPackage - the package to be imported
+     * @return
+     */
+    boolean importPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException;
 
     /**
-     * Synchronously import the stream of a {@link org.apache.sling.replication.serialization.ReplicationPackage}
+     * reads a stream and tries to convert it to a {@link ReplicationPackage} this provider can read and install
      *
-     * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
-     * @param type   the <code>String</code> representing the ({@link ReplicationPackage#getType() type} of the given package
-     * @return <code>true</code> if successfully imported, <code>false</code> otherwise
+     * @param stream  the {@link InputStream} of the package to read
+     * @return a {@link ReplicationPackage} if it can read it from the stream
+     * @throws ReplicationPackageReadingException
+     *          when the stream cannot be read as a {@link ReplicationPackage}
      */
-    boolean importStream(InputStream stream, String type);
+    ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException;
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Mon Aug  4 15:03:06 2014
@@ -60,8 +60,7 @@ public abstract class AbstractReplicatio
     protected abstract ReplicationPackage createPackageForAdd(ReplicationRequest request)
             throws ReplicationPackageBuildingException;
 
-    public ReplicationPackage readPackage(InputStream stream,
-                                          boolean install) throws ReplicationPackageReadingException {
+    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
         ReplicationPackage replicationPackage = null;
         if (!stream.markSupported()) {
             stream = new BufferedInputStream(stream);
@@ -72,28 +71,35 @@ public abstract class AbstractReplicatio
             int bytesRead = stream.read(buffer, 0, 6);
             stream.reset();
             String s = new String(buffer, "UTF-8");
-            if (log.isInfoEnabled()) {
-                log.info("read {} bytes as {}", bytesRead, s);
-            }
+            log.info("read {} bytes as {}", bytesRead, s);
+
             if (bytesRead > 0 && buffer[0] > 0 && s.startsWith("DEL")) {
-                replicationPackage = readPackageForDelete(stream);
+                replicationPackage = VoidReplicationPackage.fromStream(stream);
             }
         } catch (Exception e) {
             log.warn("cannot parse stream", e);
         }
         stream.mark(-1);
         if (replicationPackage == null) {
-            replicationPackage = readPackageForAdd(stream, install);
+            replicationPackage = readPackageInternal(stream);
         }
         return replicationPackage;
     }
 
-    private ReplicationPackage readPackageForDelete(InputStream stream) throws ReplicationPackageReadingException {
-        ReplicationPackage replicationPackage = null;
+
+    public boolean installPackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException {
+        ReplicationActionType actionType = ReplicationActionType.fromName(replicationPackage.getAction());
+        if (ReplicationActionType.DELETE.equals(actionType)) {
+            return installDeletePackage(replicationPackage);
+        }
+
+        return installPackageInternal(replicationPackage);
+
+    }
+
+    private boolean installDeletePackage(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException {
         Session session = null;
         try {
-            replicationPackage = VoidReplicationPackage.fromStream(stream);
-
             if(replicationPackage != null){
                 session = getSession();
                 for (String path : replicationPackage.getPaths()) {
@@ -102,6 +108,7 @@ public abstract class AbstractReplicatio
                     }
                 }
                 session.save();
+                return true;
             }
         } catch (Exception e) {
             throw new ReplicationPackageReadingException(e);
@@ -111,7 +118,7 @@ public abstract class AbstractReplicatio
             }
         }
 
-        return replicationPackage;
+        return false;
     }
 
     public ReplicationPackage getPackage(String id) {
@@ -133,10 +140,13 @@ public abstract class AbstractReplicatio
 
     protected abstract Session getSession() throws RepositoryException;
 
-    protected abstract ReplicationPackage readPackageForAdd(InputStream stream, boolean install)
+    protected abstract ReplicationPackage readPackageInternal(InputStream stream)
             throws ReplicationPackageReadingException;
 
 
+    protected abstract boolean installPackageInternal(ReplicationPackage replicationPackage)
+            throws ReplicationPackageReadingException;
+
     protected abstract ReplicationPackage getPackageInternal(String id);
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Mon Aug  4 15:03:06 2014
@@ -48,7 +48,7 @@ public class VoidReplicationPackage impl
         this.paths = request.getPaths();
         this.action = request.getAction().toString();
         this.id = request.getAction().toString()
-                + ':' + Arrays.toString(request.getPaths())
+                + ':' + Arrays.toString(request.getPaths()).replaceAll("\\[", "").replaceAll("\\]", "")
                 + ':' + request.getTime()
                 + ':' + type;
     }
@@ -70,7 +70,7 @@ public class VoidReplicationPackage impl
         VoidReplicationPackage replicationPackage = null;
         if(replicationActionType != null){
             pathsString = Text.unescape(pathsString);
-            String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
+            String[] paths = pathsString.split(", ");
 
             ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
                     replicationActionType, paths);

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java Mon Aug  4 15:03:06 2014
@@ -41,7 +41,7 @@ public class VoidReplicationPackageBuild
         return new VoidReplicationPackage(request, "VOID");
     }
 
-    public ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException {
+    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
         try {
             return VoidReplicationPackage.fromStream(stream);
         } catch (Exception e) {
@@ -57,4 +57,8 @@ public class VoidReplicationPackageBuild
             return null;
         }
     }
+
+    public boolean installPackage(ReplicationPackage replicationPackage) {
+        return false;
+    }
 }

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java?rev=1615605&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java Mon Aug  4 15:03:06 2014
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl.exporter;
+
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageExporter;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+@Component(label = "Agent Based Replication Package Exporter")
+@Service(value = ReplicationPackageExporter.class)
+@Property(name = "name", value = AgentReplicationPackageExporter.NAME)
+public class AgentReplicationPackageExporter implements ReplicationPackageExporter {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public static final String NAME = "agent";
+
+    @Property(label = "Queue")
+    private static final String QUEUE_NAME = "queue";
+
+    @Property(label = "Target ReplicationAgent", name = "ReplicationAgent.target", value = "(name=reverse)")
+    @Reference(name = "ReplicationAgent", target = "(name=reverse)", policy = ReferencePolicy.STATIC)
+    private ReplicationAgent agent;
+
+    @Property(label = "Target ReplicationPackageBuilder", name = "ReplicationPackageBuilder.target", value = "(name=vlt)")
+    @Reference(name = "ReplicationPackageBuilder", target = "(name=vlt)", policy = ReferencePolicy.STATIC)
+    private ReplicationPackageBuilder replicationPackageBuilder;
+
+
+    private String queueName;
+
+
+
+    @Activate
+    public void activate(BundleContext context, Map<String, ?> config) throws Exception {
+        queueName = PropertiesUtil.toString(config.get(QUEUE_NAME), "");
+    }
+
+
+    public ReplicationPackage exportPackage(ReplicationRequest replicationRequest) {
+
+        try {
+            log.info("getting item from queue {}", queueName);
+
+            // get first item
+            ReplicationPackage head = agent.removeHead(queueName);
+            return head;
+        }
+        catch (Exception ex) {
+            log.error("Error exporting package", ex);
+        }
+
+        return null;
+    }
+
+    public ReplicationPackage exportPackageById(String replicationPackageId) {
+        return null;
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java?rev=1615605&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java Mon Aug  4 15:03:06 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl.exporter;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageExporter;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+/**
+ * Default implementation of {@link org.apache.sling.replication.serialization.ReplicationPackageExporter}
+ */
+@Component(label = "Default Replication Package Exporter")
+@Service(value = ReplicationPackageExporter.class)
+@Property(name = "name", value = DefaultReplicationPackageExporter.NAME)
+public class DefaultReplicationPackageExporter implements ReplicationPackageExporter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property(label = "Name")
+    public static final String NAME = "default";
+
+    @Reference(name = "ReplicationPackageBuilder", target = "(name=vlt)", policy = ReferencePolicy.STATIC)
+    private ReplicationPackageBuilder packageBuilder;
+
+    public ReplicationPackage exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException{
+        return packageBuilder.createPackage(replicationRequest);
+    }
+
+    public ReplicationPackage exportPackageById(String replicationPackageId) {
+        return packageBuilder.getPackage(replicationPackageId);
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/DefaultReplicationPackageExporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java?rev=1615605&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java Mon Aug  4 15:03:06 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl.importer;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.serialization.*;
+import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link org.apache.sling.replication.serialization.ReplicationPackageImporter}
+ */
+@Component(label = "Default Replication Package Importer")
+@Service(value = ReplicationPackageImporter.class)
+@Property(name = "name", value = DefaultReplicationPackageImporter.NAME)
+public class DefaultReplicationPackageImporter implements ReplicationPackageImporter {
+
+    public static final String NAME = "default";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(name = "ReplicationPackageBuilder",
+            target = "(name=" + FileVaultReplicationPackageBuilder.NAME + ")",
+            policy = ReferencePolicy.DYNAMIC)
+    private ReplicationPackageBuilder replicationPackageBuilder;
+
+    @Reference
+    private ReplicationEventFactory replicationEventFactory;
+
+
+    public boolean importPackage(ReplicationPackage replicationPackage) {
+        boolean success = false;
+        try {
+            success = replicationPackageBuilder.installPackage(replicationPackage);
+
+            if (success) {
+                log.info("replication package read and installed for path(s) {}", Arrays.toString(replicationPackage.getPaths()));
+
+                Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
+                dictionary.put("replication.action", replicationPackage.getAction());
+                dictionary.put("replication.path", replicationPackage.getPaths());
+                replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED, dictionary);
+
+                replicationPackage.delete();
+            } else {
+                log.warn("could not read a replication package");
+            }
+        } catch (Exception e) {
+            log.error("cannot import a package from the given stream of type {}", replicationPackage.getType());
+        }
+        return success;
+    }
+
+    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
+        try {
+            ReplicationPackage replicationPackage = replicationPackageBuilder.readPackage(stream);
+
+            return replicationPackage;
+
+
+        } catch (Exception e) {
+            log.error("cannot read a package from the given stream");
+        }
+        return null;
+    }
+
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/DefaultReplicationPackageImporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java Mon Aug  4 15:03:06 2014
@@ -34,10 +34,7 @@ import org.apache.jackrabbit.vault.fs.ap
 import org.apache.jackrabbit.vault.fs.config.DefaultMetaInf;
 import org.apache.jackrabbit.vault.fs.config.DefaultWorkspaceFilter;
 import org.apache.jackrabbit.vault.fs.io.ImportOptions;
-import org.apache.jackrabbit.vault.packaging.ExportOptions;
-import org.apache.jackrabbit.vault.packaging.JcrPackage;
-import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.jackrabbit.vault.packaging.VaultPackage;
+import org.apache.jackrabbit.vault.packaging.*;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.jcr.api.SlingRepository;
 import org.apache.sling.replication.communication.ReplicationRequest;
@@ -138,7 +135,7 @@ public class FileVaultReplicationPackage
     }
 
     @Override
-    protected ReplicationPackage readPackageForAdd(final InputStream stream, boolean install)
+    protected ReplicationPackage readPackageInternal(final InputStream stream)
             throws ReplicationPackageReadingException {
         if (log.isDebugEnabled()) {
             log.debug("reading a stream");
@@ -150,9 +147,7 @@ public class FileVaultReplicationPackage
             if (session != null) {
                 final JcrPackage jcrPackage = packaging.getPackageManager(session).upload(stream, true,
                         false);
-                if (install) {
-                    jcrPackage.install(new ImportOptions());
-                }
+
                 pkg = new FileVaultReplicationPackage(jcrPackage.getPackage());
             }
         } catch (Exception e) {
@@ -166,6 +161,7 @@ public class FileVaultReplicationPackage
         return pkg;
     }
 
+
     @Override
     protected ReplicationPackage getPackageInternal(String id) {
         ReplicationPackage replicationPackage = null;
@@ -175,6 +171,10 @@ public class FileVaultReplicationPackage
                 VaultPackage pkg = packaging.getPackageManager().open(file);
                 replicationPackage = new FileVaultReplicationPackage(pkg);
             }
+            else {
+                VaultPackage pkg = packaging.getPackageManager(getSession()).open(PackageId.fromString(id)).getPackage();
+                replicationPackage = new FileVaultReplicationPackage(pkg);
+            }
         } catch (Exception e) {
             log.info("could not find a package with id : {}", id);
         }
@@ -189,4 +189,28 @@ public class FileVaultReplicationPackage
     }
 
 
+    @Override
+    public boolean installPackageInternal(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException{
+        log.debug("reading a stream");
+
+        Session session = null;
+        try {
+            session = getSession();
+            if (session != null) {
+                final JcrPackage jcrPackage =  packaging.getPackageManager(getSession())
+                        .open(PackageId.fromString(replicationPackage.getId()));
+
+                jcrPackage.install(new ImportOptions());
+
+            }
+        } catch (Exception e) {
+            log.error("could not read / install the package", e);
+            throw new ReplicationPackageReadingException(e);
+        } finally {
+            if (session != null) {
+                session.logout();
+            }
+        }
+        return false;
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Mon Aug  4 15:03:06 2014
@@ -62,18 +62,6 @@ public class ReplicationAgentServlet ext
     @Override
     protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response)
             throws ServletException, IOException {
-        String action = request.getHeader(ReplicationHeader.ACTION.toString());
-
-        if(ReplicationActionType.POLL.getName().equalsIgnoreCase(action)){
-            doRemove(request, response);
-        }
-        else {
-            doCreate(request, response);
-        }
-    }
-
-    private void doCreate(SlingHttpServletRequest request, SlingHttpServletResponse response)
-            throws IOException {
 
         response.setContentType("application/json");
 
@@ -111,56 +99,6 @@ public class ReplicationAgentServlet ext
         }
     }
 
-    private void doRemove(SlingHttpServletRequest request, SlingHttpServletResponse response) {
-
-        response.setContentType(ContentType.APPLICATION_OCTET_STREAM.toString());
-
-        String queueName = request.getParameter(ReplicationParameter.QUEUE.toString());
-
-        ReplicationAgent agent = request.getResource().adaptTo(ReplicationAgent.class);
-
-        /* directly polling an agent queue is only possible if such an agent doesn't have its own endpoint
-        (that is it just adds items to its queue to be polled remotely)*/
-        if (agent != null) {
-            try {
-                // TODO : consider using queue distribution strategy and validating who's making this request
-                log.info("getting item from queue {}", queueName);
-
-                // get first item
-                ReplicationPackage head = agent.removeHead(queueName);
-
-                if (head != null) {
-                    InputStream inputStream = null;
-                    int bytesCopied = -1;
-                    try {
-                        inputStream = head.createInputStream();
-                        bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
-                    }
-                    finally {
-                        IOUtils.closeQuietly(inputStream);
-                    }
-
-                    setPackageHeaders(response, head);
-
-                    // delete the package permanently
-                    head.delete();
-
-                    log.info("{} bytes written into the response", bytesCopied);
-
-                } else {
-                    log.info("nothing to fetch");
-                }
-            } catch (Exception e) {
-                response.setStatus(503);
-                log.error("error while reverse replicating from agent", e);
-            }
-            // everything ok
-            response.setStatus(200);
-        } else {
-            response.setStatus(404);
-        }
-    }
-
     String[] toStringArray(Enumeration<String> e){
         List<String> l = new ArrayList<String>();
         while (e.hasMoreElements()){
@@ -168,15 +106,5 @@ public class ReplicationAgentServlet ext
         }
 
         return l.toArray(new String[l.size()]);
-
-    }
-
-    void setPackageHeaders(SlingHttpServletResponse response, ReplicationPackage replicationPackage){
-        response.setHeader(ReplicationHeader.TYPE.toString(), replicationPackage.getType());
-        response.setHeader(ReplicationHeader.ACTION.toString(), replicationPackage.getAction());
-        for (String path : replicationPackage.getPaths()){
-            response.setHeader(ReplicationHeader.PATH.toString(), path);
-        }
-
     }
 }

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java?rev=1615605&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java Mon Aug  4 15:03:06 2014
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.servlet;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.http.entity.ContentType;
+import org.apache.sling.api.SlingHttpServletRequest;
+import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.api.servlets.SlingAllMethodsServlet;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationHeader;
+import org.apache.sling.replication.communication.ReplicationParameter;
+import org.apache.sling.replication.resources.ReplicationConstants;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageExporter;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Servlet to handle reception of replication content.
+ */
+@SuppressWarnings("serial")
+@Component(metatype = false)
+@Service(value = Servlet.class)
+@Properties({
+        @Property(name = "sling.servlet.resourceTypes", value = ReplicationConstants.EXPORTER_RESOURCE_TYPE),
+        @Property(name = "sling.servlet.methods", value = "POST")})
+public class ReplicationPackageExporterServlet extends SlingAllMethodsServlet {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response)
+            throws ServletException, IOException {
+
+        ReplicationPackageExporter replicationPackageExporter = request
+                .getResource()
+                .adaptTo(ReplicationPackageExporter.class);
+
+        boolean success = false;
+        final long start = System.currentTimeMillis();
+
+        response.setContentType(ContentType.APPLICATION_OCTET_STREAM.toString());
+
+
+        try {
+            // get first item
+            ReplicationPackage replicationPackage = replicationPackageExporter.exportPackage(null);
+
+            if (replicationPackage != null) {
+                InputStream inputStream = null;
+                int bytesCopied = -1;
+                try {
+                    inputStream = replicationPackage.createInputStream();
+                    bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
+                }
+                finally {
+                    IOUtils.closeQuietly(inputStream);
+                }
+
+                setPackageHeaders(response, replicationPackage);
+
+                // delete the package permanently
+                replicationPackage.delete();
+
+                log.info("{} bytes written into the response", bytesCopied);
+
+            } else {
+                log.info("nothing to fetch");
+            }
+            // everything ok
+            response.setStatus(200);
+        } catch (Exception e) {
+            response.setStatus(503);
+            log.error("error while reverse replicating from agent", e);
+        }
+        finally {
+            final long end = System.currentTimeMillis();
+            log.info("Processed replication export request in {}ms: : {}", new Object[]{end - start, success});
+        }
+    }
+
+    void setPackageHeaders(SlingHttpServletResponse response, ReplicationPackage replicationPackage){
+        response.setHeader(ReplicationHeader.TYPE.toString(), replicationPackage.getType());
+        response.setHeader(ReplicationHeader.ACTION.toString(), replicationPackage.getAction());
+        for (String path : replicationPackage.getPaths()){
+            response.setHeader(ReplicationHeader.PATH.toString(), path);
+        }
+    }
+
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageImporterServlet.java Mon Aug  4 15:03:06 2014
@@ -31,6 +31,7 @@ import org.apache.sling.api.SlingHttpSer
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.resources.ReplicationConstants;
+import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,23 +63,17 @@ public class ReplicationPackageImporterS
         response.setCharacterEncoding("utf-8");
 
         InputStream stream = request.getInputStream();
-        String type = request.getHeader(ReplicationHeader.TYPE.toString());
         try {
-           success = replicationPackageImporter.importStream(stream, type);
+            ReplicationPackage replicationPackage = replicationPackageImporter.readPackage(stream);
+            success = replicationPackageImporter.importPackage(replicationPackage);
         } catch (final Exception e) {
             response.setStatus(400);
-            if (log.isErrorEnabled()) {
-                log.error("Error during replication: {}", e.getMessage(), e);
-            }
+            log.error("Error during replication: {}", e.getMessage(), e);
             response.getWriter().print("error: " + e.toString());
         } finally {
             final long end = System.currentTimeMillis();
-            if (log.isInfoEnabled()) {
-                log.info("Processed replication request in {}ms: : {}", new Object[]{
-                        end - start, success});
-            }
+            log.info("Processed replication request in {}ms: : {}", new Object[]{end - start, success});
         }
-
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java Mon Aug  4 15:03:06 2014
@@ -33,28 +33,9 @@ public interface TransportHandler {
      * Executes the transport of a given {@link ReplicationPackage} to a specific {@link ReplicationEndpoint} using this
      * transport and the supplied {@link TransportAuthenticationProvider} for authenticating the endpoint
      *
-     * @param  agentName a replication agent name
      * @param replicationPackage  a {@link ReplicationPackage} to transport
      * @throws ReplicationTransportException if any error occurs during the transport
      */
-    void transport(String agentName, ReplicationPackage replicationPackage) throws ReplicationTransportException;
-
-
-    /**
-     * Enables response processing for this <code>TransportHandler</code> for a certain <code>ReplicationAgent</code>
-     *
-     * @param agentName the name of the <code>ReplicationAgent</code>
-     * @param responseProcessor a <code>ReplicationQueueProcessor</code> that is called by the <code>TransportHandler</code>
-     *                          whenever a response is received
-     */
-    void enableProcessing(String agentName, ReplicationQueueProcessor responseProcessor);
-
-    /**
-     *
-     * Disables response processing for this <code>TransportHandler</code>
-     *
-     * @param agentName the name of the <code>ReplicationAgent</code>
-     */
-    void disableProcessing(String agentName);
+    void transport(ReplicationPackage replicationPackage) throws ReplicationTransportException;
 
 }
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java?rev=1615605&r1=1615604&r2=1615605&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java Mon Aug  4 15:03:06 2014
@@ -34,18 +34,14 @@ public abstract class AbstractTransportH
     private int lastSuccessfulEnpointId = 0;
 
 
-    private final Map<String, ReplicationQueueProcessor> responseProcessorMap = new ConcurrentHashMap<String, ReplicationQueueProcessor>();
-
     public AbstractTransportHandler(ReplicationEndpoint[] endpoints, TransportEndpointStrategyType endpointStrategyType) {
         this.endpoints = endpoints;
         this.endpointStrategyType = endpointStrategyType;
     }
 
-    public void transport(String agentName, ReplicationPackage replicationPackage)
+    public void transport(ReplicationPackage replicationPackage)
             throws ReplicationTransportException {
 
-        ReplicationQueueProcessor responseProcessor = responseProcessorMap.get(agentName);
-
         ReplicationTransportException lastException = null;
         int offset = 0;
         if (endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful)) {
@@ -57,7 +53,7 @@ public abstract class AbstractTransportH
 
             ReplicationEndpoint replicationEndpoint = endpoints[currentId];
             try {
-                deliverPackage(replicationPackage, replicationEndpoint, responseProcessor);
+                deliverPackage(replicationPackage, replicationEndpoint);
                 lastSuccessfulEnpointId = currentId;
                 if (endpointStrategyType.equals(TransportEndpointStrategyType.FirstSuccessful) ||
                         endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful))
@@ -72,32 +68,22 @@ public abstract class AbstractTransportH
     }
 
 
-    public void enableProcessing(String agentName, ReplicationQueueProcessor responseProcessor){
-        responseProcessorMap.put(agentName, responseProcessor);
-    }
-
-    public void disableProcessing(String agentName){
-        responseProcessorMap.remove(agentName);
-    }
-
 
     private void deliverPackage(ReplicationPackage replicationPackage,
-                                ReplicationEndpoint replicationEndpoint,
-                                ReplicationQueueProcessor responseProcessor)
+                                ReplicationEndpoint replicationEndpoint)
             throws ReplicationTransportException {
         if (!validateEndpoint(replicationEndpoint))
             throw new ReplicationTransportException("invalid endpoint " + replicationEndpoint.getUri());
 
         try {
-            deliverPackageToEndpoint(replicationPackage, replicationEndpoint, responseProcessor);
+            deliverPackageToEndpoint(replicationPackage, replicationEndpoint);
         } catch (Exception e) {
             throw new ReplicationTransportException(e);
         }
     }
 
     protected abstract void deliverPackageToEndpoint(ReplicationPackage replicationPackage,
-                                                     ReplicationEndpoint replicationEndpoint,
-                                                     ReplicationQueueProcessor responseProcessor)
+                                                     ReplicationEndpoint replicationEndpoint)
             throws Exception;
 
     protected abstract boolean validateEndpoint(ReplicationEndpoint endpoint);



Mime
View raw message