sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1616781 [1/2] - in /sling/trunk/contrib/extensions/replication: core/src/main/java/org/apache/sling/replication/agent/impl/ core/src/main/java/org/apache/sling/replication/rule/impl/ core/src/main/java/org/apache/sling/replication/serializ...
Date Fri, 08 Aug 2014 14:32:48 GMT
Author: tommaso
Date: Fri Aug  8 14:32:47 2014
New Revision: 1616781

URL: http://svn.apache.org/r1616781
Log:
SLING-3830 - applied Marius Petria's patch for reenabling multiple endpoints and multiple polling

Added:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.transport.impl.importer.AdvancedRemoteReplicationPackageImporter-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandlerTest.java
      - copied, changed from r1616736, sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java
Removed:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/LocalReplicationPackageExporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/LocalReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportException.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationException.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/impl/NopTransportAuthenticationProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/RemoteReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/config/resourceproviders/configs/org.apache.sling.replication.resources.impl.OsgiPropertiesResourceProviderFactory-agentsConfig.json
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/it/pom.xml
    sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationAgentResourcesIntegrationTest.java

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Fri Aug  8 14:32:47 2014
@@ -84,16 +84,30 @@ public class SimpleReplicationAgent impl
             throws AgentReplicationException {
 
         // create packages from request
-        ReplicationPackage[] replicationPackages = buildPackages(replicationRequest);
+        List<ReplicationPackage> replicationPackages;
+        try {
+            replicationPackages = buildPackages(replicationRequest);
+            return schedule(replicationPackages, false);
+
+        } catch (ReplicationPackageBuildingException e) {
+            log.error("Error building packages", e);
+            throw new AgentReplicationException(e);
+        }
 
-        return schedule(replicationPackages, false);
     }
 
     public void send(ReplicationRequest replicationRequest) throws AgentReplicationException {
         // create packages from request
-        ReplicationPackage[] replicationPackages = buildPackages(replicationRequest);
+        List<ReplicationPackage> replicationPackages = null;
+        try {
+            replicationPackages = buildPackages(replicationRequest);
+            schedule(replicationPackages, true);
+
+        } catch (ReplicationPackageBuildingException e) {
+            log.error("Error building packages", e);
+            throw new AgentReplicationException(e);
+        }
 
-        schedule(replicationPackages, true);
     }
 
     public boolean isPassive() {
@@ -101,46 +115,31 @@ public class SimpleReplicationAgent impl
     }
 
 
-    private ReplicationPackage buildPackage(ReplicationRequest replicationRequest) throws AgentReplicationException {
-        // create package from request
-        ReplicationPackage replicationPackage;
-        try {
-            replicationPackage = replicationPackageExporter.exportPackage(replicationRequest);
-        } catch (ReplicationPackageBuildingException e) {
-            throw new AgentReplicationException(e);
-        }
-
-        return replicationPackage;
-    }
+    private List<ReplicationPackage> buildPackages(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
 
-    private ReplicationPackage[] buildPackages(ReplicationRequest replicationRequest) throws AgentReplicationException {
-
-        List<ReplicationPackage> packages = new ArrayList<ReplicationPackage>();
+        List<ReplicationPackage> replicationPackages = new ArrayList<ReplicationPackage>();
 
         if (useAggregatePaths) {
-            ReplicationPackage replicationPackage = buildPackage(replicationRequest);
-            if (replicationPackage != null) {
-                packages.add(replicationPackage);
-            }
+            List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(replicationRequest);
+            replicationPackages.addAll(exportedPackages);
         } else {
             for (String path : replicationRequest.getPaths()) {
-                ReplicationPackage replicationPackage = buildPackage(new ReplicationRequest(replicationRequest.getTime(),
+                ReplicationRequest splitReplicationRequest = new ReplicationRequest(replicationRequest.getTime(),
                         replicationRequest.getAction(),
-                        path));
-                if (replicationPackage != null) {
-                    packages.add(replicationPackage);
-                }
+                        path);
+                List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(splitReplicationRequest);
+                replicationPackages.addAll(exportedPackages);
             }
         }
 
-        return packages.toArray(new ReplicationPackage[packages.size()]);
+        return replicationPackages;
     }
 
     // offer option throws an exception at first error
-    private ReplicationResponse schedule(ReplicationPackage[] packages, boolean offer) throws AgentReplicationException {
+    private ReplicationResponse schedule(List<ReplicationPackage> replicationPackages, boolean offer) throws AgentReplicationException {
         ReplicationResponse replicationResponse = new ReplicationResponse();
 
-        for (ReplicationPackage replicationPackage : packages) {
+        for (ReplicationPackage replicationPackage : replicationPackages) {
             ReplicationResponse currentReplicationResponse = schedule(replicationPackage, offer);
 
             replicationResponse.setSuccessful(currentReplicationResponse.isSuccessful());
@@ -152,9 +151,8 @@ public class SimpleReplicationAgent impl
 
     private ReplicationResponse schedule(ReplicationPackage replicationPackage, boolean offer) throws AgentReplicationException {
         ReplicationResponse replicationResponse = new ReplicationResponse();
-        if (log.isInfoEnabled()) {
-            log.info("scheduling replication of package {}", replicationPackage);
-        }
+        log.info("scheduling replication of package {}", replicationPackage);
+
         ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
                 replicationPackage.getPaths(),
                 replicationPackage.getAction(),

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java Fri Aug  8 14:32:47 2014
@@ -53,7 +53,7 @@ import org.apache.sling.replication.comm
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.resources.ReplicationConstants;
 import org.apache.sling.replication.rule.ReplicationRule;
-import org.apache.sling.replication.transport.TransportHandler;
+import org.apache.sling.replication.transport.ReplicationTransportHandler;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
@@ -189,7 +189,7 @@ public class ReplicateOnQueueEventRule i
 
         public void run() {
             try {
-                ServiceReference[] serviceReferences = context.getServiceReferences(TransportHandler.class.getName(), targetTransport);
+                ServiceReference[] serviceReferences = context.getServiceReferences(ReplicationTransportHandler.class.getName(), targetTransport);
 
                 log.info("reference transport for {} found {}", targetTransport, serviceReferences != null);
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageExporter.java Fri Aug  8 14:32:47 2014
@@ -21,6 +21,8 @@ package org.apache.sling.replication.ser
 
 import org.apache.sling.replication.communication.ReplicationRequest;
 
+import java.util.List;
+
 /**
  * A {@link org.apache.sling.replication.serialization.ReplicationPackage) exporter
  */
@@ -29,7 +31,7 @@ public interface ReplicationPackageExpor
      * Exports a replication package.
      * @return the first available package in the exporter.
      */
-    ReplicationPackage exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException;
+    List<ReplicationPackage> exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException;
 
     /**
      * Exports a replication package.

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/AgentReplicationPackageExporter.java Fri Aug  8 14:32:47 2014
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.replication.serialization.impl.exporter;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.felix.scr.annotations.*;
@@ -27,7 +29,7 @@ import org.apache.sling.replication.comm
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageExporter;
-import org.osgi.framework.BundleContext;
+import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +48,10 @@ public class AgentReplicationPackageExpo
     @Reference(name = "ReplicationAgent", target = "(name=reverse)", policy = ReferencePolicy.STATIC)
     private ReplicationAgent agent;
 
-    @Property(label = "Target ReplicationPackageBuilder", name = "ReplicationPackageBuilder.target", value = "(name=vlt)")
-    @Reference(name = "ReplicationPackageBuilder", target = "(name=vlt)", policy = ReferencePolicy.STATIC)
+    @Property(label = "Target ReplicationPackageBuilder", name = "ReplicationPackageBuilder.target", value = "(name="
+            + FileVaultReplicationPackageBuilder.NAME + ")")
+    @Reference(name = "ReplicationPackageBuilder", target = "(name=" + FileVaultReplicationPackageBuilder.NAME + ")",
+            policy = ReferencePolicy.STATIC)
     private ReplicationPackageBuilder replicationPackageBuilder;
 
     private String queueName;
@@ -57,22 +61,22 @@ public class AgentReplicationPackageExpo
         queueName = PropertiesUtil.toString(config.get(QUEUE_NAME), "");
     }
 
-    public ReplicationPackage exportPackage(ReplicationRequest replicationRequest) {
+    public List<ReplicationPackage> exportPackage(ReplicationRequest replicationRequest) {
 
+        List<ReplicationPackage> result = new ArrayList<ReplicationPackage>();
         try {
             if (log.isInfoEnabled()) {
                 log.info("getting item from queue {}", queueName);
             }
 
             // get first item
-            return agent.removeHead(queueName);
+            ReplicationPackage headPackage = agent.removeHead(queueName);
+            result.add(headPackage);
         } catch (Exception ex) {
-            if (log.isErrorEnabled()) {
-                log.error("Error exporting package", ex);
-            }
+            log.error("Error exporting package", ex);
         }
 
-        return null;
+        return result;
     }
 
     public ReplicationPackage exportPackageById(String replicationPackageId) {

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/LocalReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/LocalReplicationPackageExporter.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/LocalReplicationPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/exporter/LocalReplicationPackageExporter.java Fri Aug  8 14:32:47 2014
@@ -28,6 +28,9 @@ import org.apache.sling.replication.seri
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * {@link org.apache.sling.replication.serialization.ReplicationPackageExporter} implementation which creates a FileVault based
  * {@link org.apache.sling.replication.serialization.ReplicationPackage} locally.
@@ -39,15 +42,19 @@ public class LocalReplicationPackageExpo
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    @Property(label = "Name")
     public static final String NAME = "local";
 
     @Reference(name = "ReplicationPackageBuilder", target = "(name=" + FileVaultReplicationPackageBuilder.NAME + ")",
             policy = ReferencePolicy.STATIC)
     private ReplicationPackageBuilder packageBuilder;
 
-    public ReplicationPackage exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
-        return packageBuilder.createPackage(replicationRequest);
+    public List<ReplicationPackage> exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
+        List<ReplicationPackage> result = new ArrayList<ReplicationPackage>();
+
+        ReplicationPackage createdPackage = packageBuilder.createPackage(replicationRequest);
+        result.add(createdPackage);
+
+        return result;
     }
 
     public ReplicationPackage exportPackageById(String replicationPackageId) {

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/LocalReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/LocalReplicationPackageImporter.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/LocalReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/importer/LocalReplicationPackageImporter.java Fri Aug  8 14:32:47 2014
@@ -62,7 +62,9 @@ public class LocalReplicationPackageImpo
             success = replicationPackageBuilder.installPackage(replicationPackage);
 
             if (success) {
-                log.info("replication package read and installed for path(s) {}", Arrays.toString(replicationPackage.getPaths()));
+                if (log.isInfoEnabled()) {
+                    log.info("replication package read and installed for path(s) {}", Arrays.toString(replicationPackage.getPaths()));
+                }
 
                 Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
                 dictionary.put("replication.action", replicationPackage.getAction());
@@ -71,10 +73,14 @@ public class LocalReplicationPackageImpo
 
                 replicationPackage.delete();
             } else {
-                log.warn("could not read a replication package");
+                if (log.isWarnEnabled()) {
+                    log.warn("could not read a replication package");
+                }
             }
         } catch (Exception e) {
-            log.error("cannot import a package from the given stream of type {}", replicationPackage.getType());
+            if (log.isErrorEnabled()) {
+                log.error("cannot import a package from the given stream of type {}", replicationPackage.getType());
+            }
         }
         return success;
     }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java Fri Aug  8 14:32:47 2014
@@ -53,7 +53,7 @@ public class FileVaultReplicationPackage
             paths[i] = filterSets.get(i).getRoot();
         }
         this.paths = paths;
-        this.id = pkg.getId() != null ? String.valueOf(pkg.getId()) : pkg.getFile().getAbsolutePath();
+        this.id = pkg.getFile().getAbsolutePath();
         this.action = ReplicationActionType.ADD.toString();
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java Fri Aug  8 14:32:47 2014
@@ -22,9 +22,11 @@ import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.util.Properties;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.felix.scr.annotations.*;
 import org.apache.jackrabbit.vault.fs.api.PathFilterSet;
 import org.apache.jackrabbit.vault.fs.config.DefaultMetaInf;
@@ -108,11 +110,10 @@ public class FileVaultReplicationPackage
 
             opts.setMetaInf(inf);
             opts.setRootPath("/");
-            File tmpFile = File.createTempFile("vlt-rp-" + System.nanoTime(), ".zip");
-            packaging.getPackageManager().assemble(session, opts, tmpFile);
-            JcrPackage jcrPackage = packaging.getPackageManager(session).upload(tmpFile, false, true, null);
+            File tmpFile = File.createTempFile("rp-vlt-create-" + System.nanoTime(), ".zip");
+            VaultPackage vaultPackage = packaging.getPackageManager().assemble(session, opts, tmpFile);
 
-            return new FileVaultReplicationPackage(jcrPackage.getPackage());
+            return new FileVaultReplicationPackage(vaultPackage);
         } catch (Exception e) {
             throw new ReplicationPackageBuildingException(e);
         } finally {
@@ -141,13 +142,15 @@ public class FileVaultReplicationPackage
         Session session = null;
         ReplicationPackage pkg = null;
         try {
-            session = getSession();
-            if (session != null) {
-                final JcrPackage jcrPackage = packaging.getPackageManager(session).upload(stream, true,
-                        false);
+            File tmpFile = File.createTempFile("rp-vlt-read-" + System.nanoTime(), ".zip");
+            FileOutputStream fileStream = new FileOutputStream(tmpFile);
+            IOUtils.copy(stream, fileStream);
+            IOUtils.closeQuietly(fileStream);
+
+            VaultPackage vaultPackage = packaging.getPackageManager().open(tmpFile);
+
+            pkg = new FileVaultReplicationPackage(vaultPackage);
 
-                pkg = new FileVaultReplicationPackage(jcrPackage.getPackage());
-            }
         } catch (Exception e) {
             log.error("could not read / install the package", e);
             throw new ReplicationPackageReadingException(e);
@@ -169,10 +172,6 @@ public class FileVaultReplicationPackage
                 VaultPackage pkg = packaging.getPackageManager().open(file);
                 replicationPackage = new FileVaultReplicationPackage(pkg);
             }
-            else {
-                VaultPackage pkg = packaging.getPackageManager(getSession()).open(PackageId.fromString(id)).getPackage();
-                replicationPackage = new FileVaultReplicationPackage(pkg);
-            }
         } catch (Exception e) {
             log.warn("could not find a package with id : {}", id);
         }
@@ -189,24 +188,20 @@ public class FileVaultReplicationPackage
 
     @Override
     public boolean installPackageInternal(ReplicationPackage replicationPackage) throws ReplicationPackageReadingException {
-        if (log.isDebugEnabled()) {
-            log.debug("reading a replication package stream");
-        }
+        log.debug("reading a replication package stream");
+
 
         Session session = null;
         try {
             session = getSession();
-            if (session != null) {
-                final JcrPackage jcrPackage = packaging.getPackageManager(getSession())
-                        .open(PackageId.fromString(replicationPackage.getId()));
-
-                jcrPackage.install(new ImportOptions());
-
+            File file = new File(replicationPackage.getId());
+            if (file.exists()) {
+                VaultPackage pkg = packaging.getPackageManager().open(file);
+                pkg.extract(session, new ImportOptions());
+                return true;
             }
         } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("could not read / install the package", e);
-            }
+            log.error("could not read / install the package", e);
             throw new ReplicationPackageReadingException(e);
         } finally {
             if (session != null) {

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationPackageExporterServlet.java Fri Aug  8 14:32:47 2014
@@ -41,6 +41,7 @@ import javax.servlet.Servlet;
 import javax.servlet.ServletException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
 
 /**
  * Servlet to handle reception of replication content.
@@ -71,31 +72,35 @@ public class ReplicationPackageExporterS
 
         try {
             // get first item
-            ReplicationPackage replicationPackage = replicationPackageExporter.exportPackage(null);
+            List<ReplicationPackage> replicationPackages = replicationPackageExporter.exportPackage(null);
 
-            if (replicationPackage != null) {
-                InputStream inputStream = null;
-                int bytesCopied = -1;
-                try {
-                    inputStream = replicationPackage.createInputStream();
-                    bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
-                }
-                finally {
-                    IOUtils.closeQuietly(inputStream);
-                }
-
-                setPackageHeaders(response, replicationPackage);
-
-                // delete the package permanently
-                replicationPackage.delete();
-
-                log.info("{} bytes written into the response", bytesCopied);
+            if (replicationPackages.size() > 0) {
+                log.info("{} package(s) available for fetching, the first will be delivered", replicationPackages.size());
 
-            } else {
+                ReplicationPackage replicationPackage = replicationPackages.get(0);
+                if (replicationPackage != null) {
+                    InputStream inputStream = null;
+                    int bytesCopied = -1;
+                    try {
+                        inputStream = replicationPackage.createInputStream();
+                        bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
+                    }
+                    finally {
+                        IOUtils.closeQuietly(inputStream);
+                    }
+
+                    // delete the package permanently
+                    replicationPackage.delete();
+
+                    // everything ok
+                    response.setStatus(200);
+                    log.info("{} bytes written into the response", bytesCopied);
+                }
+            } else  {
+                response.setStatus(204);
                 log.info("nothing to fetch");
             }
-            // everything ok
-            response.setStatus(200);
+
         } catch (Exception e) {
             response.setStatus(503);
             log.error("error while reverse replicating from agent", e);
@@ -106,12 +111,4 @@ public class ReplicationPackageExporterS
         }
     }
 
-    void setPackageHeaders(SlingHttpServletResponse response, ReplicationPackage replicationPackage){
-        response.setHeader(ReplicationHeader.TYPE.toString(), replicationPackage.getType());
-        response.setHeader(ReplicationHeader.ACTION.toString(), replicationPackage.getAction());
-        for (String path : replicationPackage.getPaths()){
-            response.setHeader(ReplicationHeader.PATH.toString(), path);
-        }
-    }
-
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportException.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportException.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportException.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportException.java Fri Aug  8 14:32:47 2014
@@ -19,7 +19,7 @@
 package org.apache.sling.replication.transport;
 
 /**
- * Represents an error happened while a {@link TransportHandler} is delivering a replication item to
+ * Represents an error happened while a {@link ReplicationTransportHandler} is delivering a replication item to
  * and endpoint.
  */
 @SuppressWarnings("serial")

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java?rev=1616781&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java Fri Aug  8 14:32:47 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport;
+
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+
+import java.util.List;
+
+/**
+ * A <code>TransportHandler</code> is responsible for implementing the transport of a
+ * {@link ReplicationPackage}s to / from another instance described by a {@link ReplicationEndpoint}
+ */
+public interface ReplicationTransportHandler {
+
+    /**
+     * Delivers a given {@link ReplicationPackage}
+     *
+     * @param replicationPackage  a {@link ReplicationPackage} to transport
+     * @throws ReplicationTransportException if any error occurs during the transport
+     */
+    void deliverPackage(ReplicationPackage replicationPackage) throws ReplicationTransportException;
+
+    /**
+     * Retrieves a list of {@link ReplicationPackage}
+     *
+     * @throws ReplicationTransportException if any error occurs during the transport
+     */
+    public List<ReplicationPackage> retrievePackage() throws ReplicationTransportException;
+
+}
\ No newline at end of file

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/ReplicationTransportHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationException.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationException.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationException.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationException.java Fri Aug  8 14:32:47 2014
@@ -18,7 +18,9 @@
  */
 package org.apache.sling.replication.transport.authentication;
 
-
+/**
+ * Represents an error in transport authentication
+ */
 @SuppressWarnings("serial")
 public class TransportAuthenticationException extends Exception {
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationProvider.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/TransportAuthenticationProvider.java Fri Aug  8 14:32:47 2014
@@ -21,7 +21,7 @@ package org.apache.sling.replication.tra
 
 /**
  * A <code>TransportAuthenticationProvider</code> is responsible for authentication of instances sending and
- * receiving replication items via {@link org.apache.sling.replication.transport.TransportHandler}s
+ * receiving replication items via {@link org.apache.sling.replication.transport.ReplicationTransportHandler}s
  */
 public interface TransportAuthenticationProvider<A, T> {
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/impl/NopTransportAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/impl/NopTransportAuthenticationProvider.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/impl/NopTransportAuthenticationProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/authentication/impl/NopTransportAuthenticationProvider.java Fri Aug  8 14:32:47 2014
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.replication.transport.authentication.impl;
 
-import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationException;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
@@ -28,9 +27,7 @@ import org.apache.sling.replication.tran
  */
 public class NopTransportAuthenticationProvider implements TransportAuthenticationProvider<Object, Object> {
 
-    public boolean supportsTransportHandler(TransportHandler transportHandler) {
-        return true;
-    }
+
 
     public Object authenticate(Object authenticable, TransportAuthenticationContext context)
                     throws TransportAuthenticationException {

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java?rev=1616781&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java Fri Aug  8 14:32:47 2014
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.fluent.Content;
+import org.apache.http.client.fluent.Executor;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.communication.ReplicationHeader;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.transport.ReplicationTransportException;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Advanced HTTP {@link org.apache.sling.replication.transport.ReplicationTransportHandler} supporting custom HTTP headers
+ * and body.
+ */
+public class AdvancedHttpReplicationTransportHandler extends SimpleHttpReplicationTransportHandler {
+
+    private static final String PATH_VARIABLE_NAME = "{path}";
+
+    private static final Logger log = LoggerFactory.getLogger(AdvancedHttpReplicationTransportHandler.class);
+
+    private final TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider;
+    private final ReplicationEndpoint replicationEndpoint;
+
+    private final boolean useCustomHeaders;
+
+    private final String[] customHeaders;
+
+    private final boolean useCustomBody;
+
+    private final String customBody;
+
+    public AdvancedHttpReplicationTransportHandler(boolean useCustomHeaders,
+                                                   String[] customHeaders,
+                                                   boolean useCustomBody,
+                                                   String customBody,
+                                                   TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider,
+                                                   ReplicationEndpoint replicationEndpoint,
+                                                   ReplicationPackageBuilder packageBuilder,
+                                                   int maxNoOfPackages) {
+
+
+        super(transportAuthenticationProvider, replicationEndpoint, packageBuilder, maxNoOfPackages);
+        this.useCustomHeaders = useCustomHeaders;
+        this.customHeaders = customHeaders;
+        this.useCustomBody = useCustomBody;
+        this.customBody = customBody;
+        this.transportAuthenticationProvider = transportAuthenticationProvider;
+
+        this.replicationEndpoint = replicationEndpoint;
+    }
+
+    @Override
+    public void deliverPackage(ReplicationPackage replicationPackage) throws ReplicationTransportException {
+        log.info("delivering package {} to {} using auth {}",
+                new Object[]{replicationPackage.getId(),
+                        replicationEndpoint.getUri(), transportAuthenticationProvider});
+
+
+        try {
+            Executor executor = Executor.newInstance();
+            TransportAuthenticationContext context = new TransportAuthenticationContext();
+            context.addAttribute("endpoint", replicationEndpoint);
+            executor = transportAuthenticationProvider.authenticate(executor, context);
+
+            deliverPackage(executor, replicationPackage, replicationEndpoint);
+
+        } catch (Exception ex) {
+            throw new ReplicationTransportException(ex);
+        }
+
+    }
+
+    public static String[] getCustomizedHeaders(String[] additionalHeaders, String action, String[] paths) {
+        List<String> headers = new ArrayList<String>();
+
+        for (String additionalHeader : additionalHeaders) {
+            int idx = additionalHeader.indexOf("->");
+
+            if (idx < 0) {
+                headers.add(additionalHeader);
+            } else {
+                String actionSelector = additionalHeader.substring(0, idx).trim();
+                String header = additionalHeader.substring(idx + 2).trim();
+
+                if (actionSelector.equalsIgnoreCase(action) || actionSelector.equals("*")) {
+                    headers.add(header);
+                }
+            }
+        }
+
+        StringBuilder sb = new StringBuilder();
+
+        if (paths != null && paths.length > 0) {
+            sb.append(paths[0]);
+            for (int i = 1; i < paths.length; i++) {
+                sb.append(", ").append(paths[i]);
+            }
+        }
+
+        String path = sb.toString();
+
+        List<String> boundHeaders = new ArrayList<String>();
+
+        for (String header : headers) {
+            boundHeaders.add(header.replace(PATH_VARIABLE_NAME, path));
+        }
+
+        return boundHeaders.toArray(new String[boundHeaders.size()]);
+    }
+
+    private void deliverPackage(Executor executor, ReplicationPackage replicationPackage,
+                                ReplicationEndpoint replicationEndpoint) throws IOException {
+        String type = replicationPackage.getType();
+
+
+        Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue();
+
+        if (useCustomHeaders) {
+            String[] customizedHeaders = getCustomizedHeaders(customHeaders, replicationPackage.getAction(), replicationPackage.getPaths());
+            for (String header : customizedHeaders) {
+                addHeader(req, header);
+            }
+        } else {
+            req.addHeader(ReplicationHeader.TYPE.toString(), type);
+        }
+
+        InputStream inputStream = null;
+        Response response = null;
+        try {
+            if (useCustomBody) {
+                String body = customBody == null ? "" : customBody;
+                inputStream = new ByteArrayInputStream(body.getBytes("UTF-8"));
+            } else {
+                inputStream = replicationPackage.createInputStream();
+            }
+
+            if (inputStream != null) {
+                req = req.bodyStream(inputStream, ContentType.APPLICATION_OCTET_STREAM);
+            }
+
+            response = executor.execute(req);
+        } finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+
+        if (response != null) {
+            Content content = response.returnContent();
+            if (log.isInfoEnabled()) {
+                log.info("Replication content of type {} for {} delivered: {}", new Object[]{
+                        type, Arrays.toString(replicationPackage.getPaths()), content});
+            }
+        } else {
+            throw new IOException("response is empty");
+        }
+    }
+
+    private static void addHeader(Request req, String header) {
+        int idx = header.indexOf(":");
+        if (idx < 0) return;
+        String headerName = header.substring(0, idx).trim();
+        String headerValue = header.substring(idx + 1).trim();
+        req.addHeader(headerName, headerValue);
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/AdvancedHttpReplicationTransportHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java?rev=1616781&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java Fri Aug  8 14:32:47 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.transport.ReplicationTransportException;
+import org.apache.sling.replication.transport.ReplicationTransportHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.sling.replication.transport.ReplicationTransportHandler} supporting delivery / retrieval from multiple
+ * endpoints.
+ */
+public class MultipleEndpointReplicationTransportHandler implements ReplicationTransportHandler {
+
+    private final List<ReplicationTransportHandler> transportHelpers;
+    private final TransportEndpointStrategyType endpointStrategyType;
+    private int lastSuccessfulEnpointId = 0;
+
+    public MultipleEndpointReplicationTransportHandler(List<ReplicationTransportHandler> transportHelpers,
+                                                       TransportEndpointStrategyType endpointStrategyType) {
+        this.transportHelpers = transportHelpers;
+        this.endpointStrategyType = endpointStrategyType;
+    }
+
+    private List<ReplicationPackage> doTransport(boolean isPolling,
+                                                 ReplicationPackage replicationPackage) throws ReplicationTransportException {
+
+        int offset = 0;
+        if (endpointStrategyType.equals(TransportEndpointStrategyType.One)) {
+            offset = lastSuccessfulEnpointId;
+        }
+
+        int length = transportHelpers.size();
+        List<ReplicationPackage> result = new ArrayList<ReplicationPackage>();
+
+        for (int i = 0; i < length; i++) {
+            int currentId = (offset + i) % length;
+
+            ReplicationTransportHandler transportHelper = transportHelpers.get(currentId);
+            if (!isPolling) {
+                transportHelper.deliverPackage(replicationPackage);
+            } else {
+                List<ReplicationPackage> retrievedPackages = transportHelper.retrievePackage();
+                result.addAll(retrievedPackages);
+            }
+
+            lastSuccessfulEnpointId = currentId;
+            if (endpointStrategyType.equals(TransportEndpointStrategyType.One))
+                break;
+        }
+
+        return result;
+    }
+
+    public void deliverPackage(ReplicationPackage replicationPackage) throws ReplicationTransportException {
+        doTransport(false, replicationPackage);
+    }
+
+    public List<ReplicationPackage> retrievePackage() throws ReplicationTransportException {
+        return doTransport(true, null);
+    }
+
+
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/MultipleEndpointReplicationTransportHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java?rev=1616781&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java Fri Aug  8 14:32:47 2014
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.fluent.Content;
+import org.apache.http.client.fluent.Executor;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.entity.ContentType;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.communication.ReplicationHeader;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.transport.ReplicationTransportException;
+import org.apache.sling.replication.transport.ReplicationTransportHandler;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SimpleHttpReplicationTransportHandler implements ReplicationTransportHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(SimpleHttpReplicationTransportHandler.class);
+    private final TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider;
+    private final ReplicationEndpoint replicationEndpoint;
+    private final ReplicationPackageBuilder packageBuilder;
+    private final int maxNumberOfPackages;
+
+    public SimpleHttpReplicationTransportHandler(TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider,
+                                                 ReplicationEndpoint replicationEndpoint,
+                                                 ReplicationPackageBuilder packageBuilder,
+                                                 int maxNumberOfPackages) {
+        this.transportAuthenticationProvider = transportAuthenticationProvider;
+        this.replicationEndpoint = replicationEndpoint;
+        this.packageBuilder = packageBuilder;
+        this.maxNumberOfPackages = maxNumberOfPackages;
+    }
+
+    public void deliverPackage(ReplicationPackage replicationPackage) throws ReplicationTransportException {
+        log.info("delivering package {} to {} using auth {}",
+                new Object[]{replicationPackage.getId(),
+                        replicationEndpoint.getUri(), transportAuthenticationProvider});
+
+        try {
+            Executor executor = Executor.newInstance();
+            TransportAuthenticationContext context = new TransportAuthenticationContext();
+            context.addAttribute("endpoint", replicationEndpoint);
+            executor =  transportAuthenticationProvider.authenticate(executor, context);
+
+            Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue();
+
+            InputStream inputStream = null;
+            Response response = null;
+            try{
+
+                inputStream = replicationPackage.createInputStream();
+
+
+                if(inputStream != null) {
+                    req = req.bodyStream(inputStream, ContentType.APPLICATION_OCTET_STREAM);
+                }
+
+                response = executor.execute(req);
+            }
+            finally {
+                IOUtils.closeQuietly(inputStream);
+            }
+
+            if (response != null) {
+                Content content = response.returnContent();
+                log.info("Replication content of type {} for {} delivered: {}", new Object[]{
+                        replicationPackage.getType(), Arrays.toString(replicationPackage.getPaths()), content});
+            }
+            else {
+                throw new IOException("response is empty");
+            }
+
+        }
+        catch (Exception ex) {
+            throw new ReplicationTransportException(ex);
+        }
+
+    }
+
+    public List<ReplicationPackage> retrievePackage() throws ReplicationTransportException {
+        log.debug("polling from {}", replicationEndpoint.getUri());
+
+        try {
+            List<ReplicationPackage> result = new ArrayList<ReplicationPackage>();
+
+            Executor executor = Executor.newInstance();
+            TransportAuthenticationContext context = new TransportAuthenticationContext();
+            context.addAttribute("endpoint", replicationEndpoint);
+            executor = transportAuthenticationProvider.authenticate(executor, context);
+
+            Request req = Request.Post(replicationEndpoint.getUri())
+                    .addHeader(ReplicationHeader.ACTION.toString(), ReplicationActionType.POLL.getName())
+                    .useExpectContinue();
+            // TODO : add queue parameter
+
+            // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type)
+            HttpResponse httpResponse;
+            try {
+
+                int polls = maxNumberOfPackages;
+                while ((httpResponse = executor.execute(req).returnResponse()).getStatusLine().getStatusCode() == 200
+                        && polls > 0) {
+                    ReplicationPackage responsePackage = packageBuilder.readPackage(httpResponse.getEntity().getContent());
+
+                    result.add(responsePackage);
+                    polls--;
+                }
+
+            } catch (HttpHostConnectException e) {
+                log.warn("could not connect to {} - skipping", replicationEndpoint.getUri());
+            }
+
+            return result;
+
+        }
+        catch (Exception ex) {
+            throw new ReplicationTransportException(ex);
+
+        }
+
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/SimpleHttpReplicationTransportHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java Fri Aug  8 14:32:47 2014
@@ -1,7 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.sling.replication.transport.impl;
 
 public enum TransportEndpointStrategyType {
-    FirstSuccessful,
-    OneSuccessful,
+    One,
     All
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java?rev=1616781&r1=1616780&r2=1616781&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/exporter/RemoteReplicationPackageExporter.java Fri Aug  8 14:32:47 2014
@@ -19,25 +19,24 @@
 package org.apache.sling.replication.transport.impl.exporter;
 
 import org.apache.felix.scr.annotations.*;
-import org.apache.http.HttpResponse;
 import org.apache.http.client.fluent.Executor;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.conn.HttpHostConnectException;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
-import org.apache.sling.replication.communication.ReplicationActionType;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
-import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.serialization.*;
-import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
+import org.apache.sling.replication.transport.ReplicationTransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
-import org.apache.sling.replication.transport.authentication.impl.UserCredentialsTransportAuthenticationProviderFactory;
+import org.apache.sling.replication.transport.impl.MultipleEndpointReplicationTransportHandler;
+import org.apache.sling.replication.transport.impl.SimpleHttpReplicationTransportHandler;
+import org.apache.sling.replication.transport.impl.TransportEndpointStrategyType;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -60,30 +59,56 @@ public class RemoteReplicationPackageExp
     @Reference(name = "ReplicationPackageBuilder", policy = ReferencePolicy.STATIC)
     private ReplicationPackageBuilder packageBuilder;
 
-    TransportAuthenticationProvider<Executor, Executor>  transportAuthenticationProvider;
-    ReplicationEndpoint replicationEndpoint;
+    @Property(name = "poll items", description = "number of subsequent poll requests to make", intValue = 1)
+    private static final String POLL_ITEMS = "poll.items";
+
+    @Property(options = {
+            @PropertyOption(name = "All",
+                    value = "all endpoints"
+            ),
+            @PropertyOption(name = "One",
+                    value = "one endpoint"
+            )},
+            value = "One"
+    )
+    private static final String ENDPOINT_STRATEGY = ReplicationAgentConfiguration.ENDPOINT_STRATEGY;
+
+    int pollItems;
+    ReplicationTransportHandler transportHandler;
 
     @Activate
     protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
 
         Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES), new String[0]);
 
-        transportAuthenticationProvider = (TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
+        TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider = (TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
 
         String[] endpoints = PropertiesUtil.toStringArray(config.get(ReplicationAgentConfiguration.ENDPOINT), new String[0]);
 
-        replicationEndpoint = new ReplicationEndpoint(endpoints[0]);
+        pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), 1);
+
+        String endpointStrategyName = PropertiesUtil.toString(config.get(ReplicationAgentConfiguration.ENDPOINT_STRATEGY), "One");
+        TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
 
+        List<ReplicationTransportHandler> transportHandlers = new ArrayList<ReplicationTransportHandler>();
+
+        for (String endpoint : endpoints) {
+            if (endpoint != null && endpoint.length() > 0) {
+                transportHandlers.add(new SimpleHttpReplicationTransportHandler(transportAuthenticationProvider,
+                        new ReplicationEndpoint(endpoint), packageBuilder, pollItems));
+            }
+        }
+        transportHandler = new MultipleEndpointReplicationTransportHandler(transportHandlers,
+                transportEndpointStrategyType);
     }
 
     @Deactivate
     protected void deactivate() {
     }
 
-
-    public ReplicationPackage exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
+    public List<ReplicationPackage> exportPackage(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
         try {
-            return pollPackageFromEndpoint(replicationRequest, replicationEndpoint);
+            return transportHandler.retrievePackage();
         } catch (Exception e) {
             throw new ReplicationPackageBuildingException(e);
         }
@@ -92,38 +117,4 @@ public class RemoteReplicationPackageExp
     public ReplicationPackage exportPackageById(String replicationPackageId) {
         return packageBuilder.getPackage(replicationPackageId);
     }
-
-
-    private ReplicationPackage pollPackageFromEndpoint(ReplicationRequest replicationRequest, ReplicationEndpoint replicationEndpoint)
-            throws Exception {
-        log.debug("polling from {}", replicationEndpoint.getUri());
-
-
-        Executor executor = Executor.newInstance();
-        TransportAuthenticationContext context = new TransportAuthenticationContext();
-        context.addAttribute("endpoint", replicationEndpoint);
-        executor = transportAuthenticationProvider.authenticate(executor, context);
-
-        Request req = Request.Post(replicationEndpoint.getUri())
-                .addHeader(ReplicationHeader.ACTION.toString(), ReplicationActionType.POLL.getName())
-                .useExpectContinue();
-        // TODO : add queue parameter
-
-        // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type)
-        HttpResponse httpResponse;
-        try {
-            httpResponse = executor.execute(req).returnResponse();
-            if (httpResponse.containsHeader(ReplicationHeader.TYPE.toString())) {
-                ReplicationPackage responsePackage = packageBuilder.readPackage(httpResponse.getEntity().getContent());
-
-                return responsePackage;
-            }
-        } catch (HttpHostConnectException e) {
-            log.warn("could not connect to {} - skipping", replicationEndpoint.getUri());
-        }
-
-        return null;
-
-    }
-
 }

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java?rev=1616781&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java Fri Aug  8 14:32:47 2014
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl.importer;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.http.client.fluent.Executor;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+import org.apache.sling.replication.transport.ReplicationTransportHandler;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
+import org.apache.sling.replication.transport.impl.AdvancedHttpReplicationTransportHandler;
+import org.apache.sling.replication.transport.impl.MultipleEndpointReplicationTransportHandler;
+import org.apache.sling.replication.transport.impl.TransportEndpointStrategyType;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link org.apache.sling.replication.serialization.ReplicationPackageImporter} supporting multiple
+ * endpoints and custom HTTP headers and body.
+ */
+@Component(label = "Advanced Remote Replication Package Importer", configurationFactory = true)
+@Service(value = ReplicationPackageImporter.class)
+public class AdvancedRemoteReplicationPackageImporter implements ReplicationPackageImporter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property(name = ReplicationAgentConfiguration.TRANSPORT_AUTHENTICATION_FACTORY)
+    @Reference(name = "TransportAuthenticationProviderFactory", policy = ReferencePolicy.DYNAMIC)
+    private TransportAuthenticationProviderFactory transportAuthenticationProviderFactory;
+
+    @Property(options = {
+            @PropertyOption(name = "All",
+                    value = "all endpoints"
+            ),
+            @PropertyOption(name = "One",
+                    value = "one endpoint"
+            )},
+            value = "One"
+    )
+    private static final String ENDPOINT_STRATEGY = ReplicationAgentConfiguration.ENDPOINT_STRATEGY;
+
+    @Property(boolValue = false)
+    private static final String USE_CUSTOM_HEADERS = "useCustomHeaders";
+
+    @Property(cardinality = 50)
+    private static final String CUSTOM_HEADERS = "customHeaders";
+
+    @Property(boolValue = false)
+    private static final String USE_CUSTOM_BODY = "useCustomBody";
+
+    @Property
+    private static final String CUSTOM_BODY = "customBody";
+
+
+    @Reference
+    private ReplicationEventFactory replicationEventFactory;
+
+
+    ReplicationTransportHandler transportHandler;
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
+
+        Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES),
+                new String[0]);
+
+        TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider = (TransportAuthenticationProvider<Executor, Executor>)
+                transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
+        String[] endpoints = PropertiesUtil.toStringArray(config.get(ReplicationAgentConfiguration.ENDPOINT), new String[0]);
+        String endpointStrategyName = PropertiesUtil.toString(config.get(ReplicationAgentConfiguration.ENDPOINT_STRATEGY),
+                TransportEndpointStrategyType.One.name());
+        TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
+
+
+        boolean useCustomHeaders = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_HEADERS), false);
+        String[] customHeaders = PropertiesUtil.toStringArray(config.get(CUSTOM_HEADERS), new String[0]);
+        boolean useCustomBody = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_BODY), false);
+        String customBody = PropertiesUtil.toString(config.get(CUSTOM_BODY), "");
+
+
+        List<ReplicationTransportHandler> transportHandlers = new ArrayList<ReplicationTransportHandler>();
+
+        for (String endpoint : endpoints) {
+            if (endpoint != null && endpoint.length() > 0) {
+                transportHandlers.add(new AdvancedHttpReplicationTransportHandler(useCustomHeaders, customHeaders,
+                        useCustomBody, customBody,
+                        transportAuthenticationProvider,
+                        new ReplicationEndpoint(endpoint), null, -1));
+            }
+        }
+        transportHandler = new MultipleEndpointReplicationTransportHandler(transportHandlers,
+                transportEndpointStrategyType);
+
+    }
+
+
+    public boolean importPackage(ReplicationPackage replicationPackage) {
+        boolean result = false;
+        try {
+            transportHandler.deliverPackage(replicationPackage);
+            result = true;
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("failed delivery", e);
+            }
+        }
+        return result;
+    }
+
+    public ReplicationPackage readPackage(InputStream stream) throws ReplicationPackageReadingException {
+        return null;
+    }
+
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/transport/impl/importer/AdvancedRemoteReplicationPackageImporter.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message